第六章:Python 集成¶
安装客户端¶
连接 Elasticsearch¶
基本连接¶
from elasticsearch import Elasticsearch
# 连接本地 Elasticsearch
es = Elasticsearch("http://localhost:9200")
# 检查连接
if es.ping():
print("连接成功")
else:
print("连接失败")
认证连接¶
from elasticsearch import Elasticsearch
# 基本认证
es = Elasticsearch(
["http://localhost:9200"],
basic_auth=("elastic", "your_password")
)
# API Key 认证
es = Elasticsearch(
["http://localhost:9200"],
api_key="your_api_key"
)
# HTTPS 连接
es = Elasticsearch(
["https://localhost:9200"],
basic_auth=("elastic", "your_password"),
verify_certs=True,
ca_certs="/path/to/ca.crt"
)
连接池配置¶
from elasticsearch import Elasticsearch
es = Elasticsearch(
["http://localhost:9200", "http://localhost:9201"],
# 连接池大小
maxsize=25,
# 超时设置
request_timeout=30,
# 重试次数
max_retries=3,
# 重试状态码
retry_on_status=(502, 503, 504)
)
索引操作¶
创建索引¶
# 创建索引
es.indices.create(index="products")
# 创建索引带映射
mapping = {
"mappings": {
"properties": {
"name": {"type": "text", "analyzer": "ik_max_word"},
"price": {"type": "float"},
"category": {"type": "keyword"},
"brand": {"type": "keyword"},
"stock": {"type": "integer"},
"is_available": {"type": "boolean"},
"created_at": {"type": "date"}
}
},
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1
}
}
es.indices.create(index="products", body=mapping)
检查索引¶
# 检查索引是否存在
if es.indices.exists(index="products"):
print("索引存在")
# 获取索引设置
settings = es.indices.get_settings(index="products")
print(settings)
# 获取映射
mapping = es.indices.get_mapping(index="products")
print(mapping)
删除索引¶
文档操作¶
创建文档¶
# 指定 ID 创建
doc = {
"name": "iPhone 15",
"price": 7999,
"category": "手机",
"brand": "Apple",
"stock": 100,
"is_available": True,
"created_at": "2024-01-15"
}
es.index(index="products", id=1, document=doc)
# 自动生成 ID
es.index(index="products", document=doc)
# 批量创建
from elasticsearch.helpers import bulk
actions = [
{
"_index": "products",
"_id": i,
"_source": {
"name": f"Product {i}",
"price": 100 * i,
"category": "电子产品"
}
}
for i in range(1, 101)
]
bulk(es, actions)
获取文档¶
# 获取单个文档
response = es.get(index="products", id=1)
print(response["_source"])
# 检查文档是否存在
if es.exists(index="products", id=1):
print("文档存在")
# 批量获取
response = es.mget(
index="products",
ids=[1, 2, 3]
)
for doc in response["docs"]:
print(doc["_source"])
更新文档¶
# 部分更新
es.update(
index="products",
id=1,
doc={
"price": 7499,
"stock": 80
}
)
# 脚本更新
es.update(
index="products",
id=1,
script={
"source": "ctx._source.stock -= params.count",
"params": {"count": 5}
}
)
# upsert
es.update(
index="products",
id=1,
doc={"price": 7999},
upsert={
"name": "iPhone 15",
"price": 7999,
"category": "手机"
}
)
删除文档¶
# 删除单个文档
es.delete(index="products", id=1)
# 删除查询结果
es.delete_by_query(
index="products",
query={
"match": {"brand": "Apple"}
}
)
查询操作¶
基本查询¶
# 查询所有
response = es.search(index="products", query={"match_all": {}})
# 遍历结果
for hit in response["hits"]["hits"]:
print(hit["_source"])
# 获取总数
total = response["hits"]["total"]["value"]
print(f"共 {total} 条记录")
全文查询¶
# match 查询
response = es.search(
index="products",
query={
"match": {
"name": "iPhone"
}
}
)
# match_phrase 查询
response = es.search(
index="products",
query={
"match_phrase": {
"name": "iPhone 15"
}
}
)
# multi_match 查询
response = es.search(
index="products",
query={
"multi_match": {
"query": "iPhone",
"fields": ["name", "description", "brand"]
}
}
)
精确查询¶
# term 查询
response = es.search(
index="products",
query={
"term": {
"brand": "Apple"
}
}
)
# terms 查询
response = es.search(
index="products",
query={
"terms": {
"brand": ["Apple", "Samsung"]
}
}
)
# range 查询
response = es.search(
index="products",
query={
"range": {
"price": {
"gte": 1000,
"lte": 5000
}
}
}
)
复合查询¶
# bool 查询
response = es.search(
index="products",
query={
"bool": {
"must": [
{"match": {"name": "iPhone"}}
],
"filter": [
{"range": {"price": {"lte": 10000}}},
{"term": {"is_available": True}}
],
"should": [
{"term": {"brand": "Apple"}}
],
"must_not": [
{"term": {"status": "discontinued"}}
]
}
}
)
分页和排序¶
# 分页
response = es.search(
index="products",
query={"match_all": {}},
from_=0,
size=10
)
# 排序
response = es.search(
index="products",
query={"match_all": {}},
sort=[
{"price": {"order": "desc"}},
{"created_at": {"order": "desc"}}
]
)
# 只返回特定字段
response = es.search(
index="products",
query={"match_all": {}},
source=["name", "price"]
)
高亮显示¶
response = es.search(
index="products",
query={
"match": {"name": "iPhone"}
},
highlight={
"fields": {
"name": {}
},
"pre_tags": ["<em>"],
"post_tags": ["</em>"]
}
)
for hit in response["hits"]["hits"]:
print(f"原文: {hit['_source']['name']}")
print(f"高亮: {hit['highlight']['name'][0]}")
滚动查询¶
# 滚动查询大数据量
response = es.search(
index="products",
query={"match_all": {}},
size=1000,
scroll="2m"
)
scroll_id = response["_scroll_id"]
hits = response["hits"]["hits"]
while len(hits):
for hit in hits:
print(hit["_source"])
# 获取下一批
response = es.scroll(scroll_id=scroll_id, scroll="2m")
scroll_id = response["_scroll_id"]
hits = response["hits"]["hits"]
# 清除滚动
es.clear_scroll(scroll_id=scroll_id)
聚合操作¶
基本聚合¶
# 统计聚合
response = es.search(
index="products",
size=0,
aggs={
"avg_price": {"avg": {"field": "price"}},
"max_price": {"max": {"field": "price"}},
"min_price": {"min": {"field": "price"}},
"sum_price": {"sum": {"field": "price"}}
}
)
print(f"平均价格: {response['aggregations']['avg_price']['value']}")
print(f"最高价格: {response['aggregations']['max_price']['value']}")
分组聚合¶
# terms 聚合
response = es.search(
index="products",
size=0,
aggs={
"brands": {
"terms": {"field": "brand", "size": 10}
}
}
)
for bucket in response["aggregations"]["brands"]["buckets"]:
print(f"{bucket['key']}: {bucket['doc_count']}")
# 嵌套聚合
response = es.search(
index="products",
size=0,
aggs={
"brands": {
"terms": {"field": "brand"},
"aggs": {
"avg_price": {"avg": {"field": "price"}}
}
}
}
)
for bucket in response["aggregations"]["brands"]["buckets"]:
print(f"{bucket['key']}: 平均价格 {bucket['avg_price']['value']}")
范围聚合¶
response = es.search(
index="products",
size=0,
aggs={
"price_ranges": {
"range": {
"field": "price",
"ranges": [
{"to": 1000},
{"from": 1000, "to": 5000},
{"from": 5000, "to": 10000},
{"from": 10000}
]
}
}
}
)
for bucket in response["aggregations"]["price_ranges"]["buckets"]:
print(f"{bucket['key']}: {bucket['doc_count']}")
日期聚合¶
response = es.search(
index="products",
size=0,
aggs={
"sales_over_time": {
"date_histogram": {
"field": "created_at",
"calendar_interval": "month"
}
}
}
)
for bucket in response["aggregations"]["sales_over_time"]["buckets"]:
print(f"{bucket['key_as_string']}: {bucket['doc_count']}")
异步客户端¶
from elasticsearch import AsyncElasticsearch
import asyncio
es = AsyncElasticsearch("http://localhost:9200")
async def search_products():
response = await es.search(
index="products",
query={"match": {"name": "iPhone"}}
)
return response
async def main():
response = await search_products()
for hit in response["hits"]["hits"]:
print(hit["_source"])
await es.close()
asyncio.run(main())
批量操作助手¶
from elasticsearch.helpers import bulk, streaming_bulk
# 使用生成器批量导入
def generate_docs():
for i in range(10000):
yield {
"_index": "products",
"_id": i,
"_source": {
"name": f"Product {i}",
"price": 100 * (i % 100),
"category": "电子产品"
}
}
# 批量导入
success, failed = bulk(es, generate_docs())
print(f"成功: {success}, 失败: {len(failed)}")
# 流式批量导入(内存友好)
for ok, response in streaming_bulk(es, generate_docs(), chunk_size=500):
if not ok:
print(f"导入失败: {response}")
错误处理¶
from elasticsearch import Elasticsearch, NotFoundError, RequestError
es = Elasticsearch("http://localhost:9200")
try:
doc = es.get(index="products", id=999)
except NotFoundError:
print("文档不存在")
except RequestError as e:
print(f"请求错误: {e}")
except Exception as e:
print(f"其他错误: {e}")
完整示例¶
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
import json
class ProductSearch:
def __init__(self, hosts="http://localhost:9200"):
self.es = Elasticsearch(hosts)
self.index = "products"
def create_index(self):
"""创建索引"""
mapping = {
"mappings": {
"properties": {
"name": {"type": "text", "analyzer": "ik_max_word"},
"price": {"type": "float"},
"category": {"type": "keyword"},
"brand": {"type": "keyword"},
"stock": {"type": "integer"},
"is_available": {"type": "boolean"},
"created_at": {"type": "date"}
}
}
}
if not self.es.indices.exists(index=self.index):
self.es.indices.create(index=self.index, body=mapping)
def index_product(self, product):
"""索引商品"""
return self.es.index(index=self.index, document=product)
def bulk_index(self, products):
"""批量索引"""
actions = [
{"_index": self.index, "_source": p}
for p in products
]
return bulk(self.es, actions)
def search(self, query, filters=None, size=10):
"""搜索商品"""
bool_query = {"must": [{"match": {"name": query}}]}
if filters:
bool_query["filter"] = filters
response = self.es.search(
index=self.index,
query={"bool": bool_query},
size=size
)
return [hit["_source"] for hit in response["hits"]["hits"]]
def search_by_price_range(self, min_price, max_price):
"""按价格范围搜索"""
response = self.es.search(
index=self.index,
query={
"range": {
"price": {
"gte": min_price,
"lte": max_price
}
}
}
)
return [hit["_source"] for hit in response["hits"]["hits"]]
def get_stats(self):
"""获取统计信息"""
response = self.es.search(
index=self.index,
size=0,
aggs={
"avg_price": {"avg": {"field": "price"}},
"brands": {"terms": {"field": "brand", "size": 10}}
}
)
return response["aggregations"]
# 使用示例
if __name__ == "__main__":
search = ProductSearch()
search.create_index()
# 索引商品
search.index_product({
"name": "iPhone 15 Pro Max",
"price": 9999,
"category": "手机",
"brand": "Apple",
"stock": 100,
"is_available": True
})
# 搜索
results = search.search("iPhone", filters=[
{"range": {"price": {"lte": 10000}}}
])
print(json.dumps(results, ensure_ascii=False, indent=2))
小结¶
本章介绍了 Elasticsearch Python 客户端的使用:
- 连接配置:认证、连接池、超时设置
- 索引操作:创建、检查、删除
- 文档操作:CRUD、批量操作
- 查询操作:全文、精确、复合查询
- 聚合操作:统计、分组、范围聚合
- 异步客户端:async/await 支持
- 错误处理:异常捕获
下一章我们将学习聚合分析。