跳转至

第六章:Python 集成

安装客户端

pip install elasticsearch

连接 Elasticsearch

基本连接

from elasticsearch import Elasticsearch

# 连接本地 Elasticsearch
es = Elasticsearch("http://localhost:9200")

# 检查连接
if es.ping():
    print("连接成功")
else:
    print("连接失败")

认证连接

from elasticsearch import Elasticsearch

# 基本认证
es = Elasticsearch(
    ["http://localhost:9200"],
    basic_auth=("elastic", "your_password")
)

# API Key 认证
es = Elasticsearch(
    ["http://localhost:9200"],
    api_key="your_api_key"
)

# HTTPS 连接
es = Elasticsearch(
    ["https://localhost:9200"],
    basic_auth=("elastic", "your_password"),
    verify_certs=True,
    ca_certs="/path/to/ca.crt"
)

连接池配置

from elasticsearch import Elasticsearch

es = Elasticsearch(
    ["http://localhost:9200", "http://localhost:9201"],
    # 连接池大小
    maxsize=25,
    # 超时设置
    request_timeout=30,
    # 重试次数
    max_retries=3,
    # 重试状态码
    retry_on_status=(502, 503, 504)
)

索引操作

创建索引

# 创建索引
es.indices.create(index="products")

# 创建索引带映射
mapping = {
    "mappings": {
        "properties": {
            "name": {"type": "text", "analyzer": "ik_max_word"},
            "price": {"type": "float"},
            "category": {"type": "keyword"},
            "brand": {"type": "keyword"},
            "stock": {"type": "integer"},
            "is_available": {"type": "boolean"},
            "created_at": {"type": "date"}
        }
    },
    "settings": {
        "number_of_shards": 3,
        "number_of_replicas": 1
    }
}
es.indices.create(index="products", body=mapping)

检查索引

# 检查索引是否存在
if es.indices.exists(index="products"):
    print("索引存在")

# 获取索引设置
settings = es.indices.get_settings(index="products")
print(settings)

# 获取映射
mapping = es.indices.get_mapping(index="products")
print(mapping)

删除索引

# 删除索引
es.indices.delete(index="products")

# 删除多个索引
es.indices.delete(index=["products", "orders"])

文档操作

创建文档

# 指定 ID 创建
doc = {
    "name": "iPhone 15",
    "price": 7999,
    "category": "手机",
    "brand": "Apple",
    "stock": 100,
    "is_available": True,
    "created_at": "2024-01-15"
}
es.index(index="products", id=1, document=doc)

# 自动生成 ID
es.index(index="products", document=doc)

# 批量创建
from elasticsearch.helpers import bulk

actions = [
    {
        "_index": "products",
        "_id": i,
        "_source": {
            "name": f"Product {i}",
            "price": 100 * i,
            "category": "电子产品"
        }
    }
    for i in range(1, 101)
]
bulk(es, actions)

获取文档

# 获取单个文档
response = es.get(index="products", id=1)
print(response["_source"])

# 检查文档是否存在
if es.exists(index="products", id=1):
    print("文档存在")

# 批量获取
response = es.mget(
    index="products",
    ids=[1, 2, 3]
)
for doc in response["docs"]:
    print(doc["_source"])

更新文档

# 部分更新
es.update(
    index="products",
    id=1,
    doc={
        "price": 7499,
        "stock": 80
    }
)

# 脚本更新
es.update(
    index="products",
    id=1,
    script={
        "source": "ctx._source.stock -= params.count",
        "params": {"count": 5}
    }
)

# upsert
es.update(
    index="products",
    id=1,
    doc={"price": 7999},
    upsert={
        "name": "iPhone 15",
        "price": 7999,
        "category": "手机"
    }
)

删除文档

# 删除单个文档
es.delete(index="products", id=1)

# 删除查询结果
es.delete_by_query(
    index="products",
    query={
        "match": {"brand": "Apple"}
    }
)

查询操作

基本查询

# 查询所有
response = es.search(index="products", query={"match_all": {}})

# 遍历结果
for hit in response["hits"]["hits"]:
    print(hit["_source"])

# 获取总数
total = response["hits"]["total"]["value"]
print(f"共 {total} 条记录")

全文查询

# match 查询
response = es.search(
    index="products",
    query={
        "match": {
            "name": "iPhone"
        }
    }
)

# match_phrase 查询
response = es.search(
    index="products",
    query={
        "match_phrase": {
            "name": "iPhone 15"
        }
    }
)

# multi_match 查询
response = es.search(
    index="products",
    query={
        "multi_match": {
            "query": "iPhone",
            "fields": ["name", "description", "brand"]
        }
    }
)

精确查询

# term 查询
response = es.search(
    index="products",
    query={
        "term": {
            "brand": "Apple"
        }
    }
)

# terms 查询
response = es.search(
    index="products",
    query={
        "terms": {
            "brand": ["Apple", "Samsung"]
        }
    }
)

# range 查询
response = es.search(
    index="products",
    query={
        "range": {
            "price": {
                "gte": 1000,
                "lte": 5000
            }
        }
    }
)

复合查询

# bool 查询
response = es.search(
    index="products",
    query={
        "bool": {
            "must": [
                {"match": {"name": "iPhone"}}
            ],
            "filter": [
                {"range": {"price": {"lte": 10000}}},
                {"term": {"is_available": True}}
            ],
            "should": [
                {"term": {"brand": "Apple"}}
            ],
            "must_not": [
                {"term": {"status": "discontinued"}}
            ]
        }
    }
)

分页和排序

# 分页
response = es.search(
    index="products",
    query={"match_all": {}},
    from_=0,
    size=10
)

# 排序
response = es.search(
    index="products",
    query={"match_all": {}},
    sort=[
        {"price": {"order": "desc"}},
        {"created_at": {"order": "desc"}}
    ]
)

# 只返回特定字段
response = es.search(
    index="products",
    query={"match_all": {}},
    source=["name", "price"]
)

高亮显示

response = es.search(
    index="products",
    query={
        "match": {"name": "iPhone"}
    },
    highlight={
        "fields": {
            "name": {}
        },
        "pre_tags": ["<em>"],
        "post_tags": ["</em>"]
    }
)

for hit in response["hits"]["hits"]:
    print(f"原文: {hit['_source']['name']}")
    print(f"高亮: {hit['highlight']['name'][0]}")

滚动查询

# 滚动查询大数据量
response = es.search(
    index="products",
    query={"match_all": {}},
    size=1000,
    scroll="2m"
)

scroll_id = response["_scroll_id"]
hits = response["hits"]["hits"]

while len(hits):
    for hit in hits:
        print(hit["_source"])

    # 获取下一批
    response = es.scroll(scroll_id=scroll_id, scroll="2m")
    scroll_id = response["_scroll_id"]
    hits = response["hits"]["hits"]

# 清除滚动
es.clear_scroll(scroll_id=scroll_id)

聚合操作

基本聚合

# 统计聚合
response = es.search(
    index="products",
    size=0,
    aggs={
        "avg_price": {"avg": {"field": "price"}},
        "max_price": {"max": {"field": "price"}},
        "min_price": {"min": {"field": "price"}},
        "sum_price": {"sum": {"field": "price"}}
    }
)

print(f"平均价格: {response['aggregations']['avg_price']['value']}")
print(f"最高价格: {response['aggregations']['max_price']['value']}")

分组聚合

# terms 聚合
response = es.search(
    index="products",
    size=0,
    aggs={
        "brands": {
            "terms": {"field": "brand", "size": 10}
        }
    }
)

for bucket in response["aggregations"]["brands"]["buckets"]:
    print(f"{bucket['key']}: {bucket['doc_count']}")

# 嵌套聚合
response = es.search(
    index="products",
    size=0,
    aggs={
        "brands": {
            "terms": {"field": "brand"},
            "aggs": {
                "avg_price": {"avg": {"field": "price"}}
            }
        }
    }
)

for bucket in response["aggregations"]["brands"]["buckets"]:
    print(f"{bucket['key']}: 平均价格 {bucket['avg_price']['value']}")

范围聚合

response = es.search(
    index="products",
    size=0,
    aggs={
        "price_ranges": {
            "range": {
                "field": "price",
                "ranges": [
                    {"to": 1000},
                    {"from": 1000, "to": 5000},
                    {"from": 5000, "to": 10000},
                    {"from": 10000}
                ]
            }
        }
    }
)

for bucket in response["aggregations"]["price_ranges"]["buckets"]:
    print(f"{bucket['key']}: {bucket['doc_count']}")

日期聚合

response = es.search(
    index="products",
    size=0,
    aggs={
        "sales_over_time": {
            "date_histogram": {
                "field": "created_at",
                "calendar_interval": "month"
            }
        }
    }
)

for bucket in response["aggregations"]["sales_over_time"]["buckets"]:
    print(f"{bucket['key_as_string']}: {bucket['doc_count']}")

异步客户端

from elasticsearch import AsyncElasticsearch
import asyncio

es = AsyncElasticsearch("http://localhost:9200")

async def search_products():
    response = await es.search(
        index="products",
        query={"match": {"name": "iPhone"}}
    )
    return response

async def main():
    response = await search_products()
    for hit in response["hits"]["hits"]:
        print(hit["_source"])
    await es.close()

asyncio.run(main())

批量操作助手

from elasticsearch.helpers import bulk, streaming_bulk

# 使用生成器批量导入
def generate_docs():
    for i in range(10000):
        yield {
            "_index": "products",
            "_id": i,
            "_source": {
                "name": f"Product {i}",
                "price": 100 * (i % 100),
                "category": "电子产品"
            }
        }

# 批量导入
success, failed = bulk(es, generate_docs())
print(f"成功: {success}, 失败: {len(failed)}")

# 流式批量导入(内存友好)
for ok, response in streaming_bulk(es, generate_docs(), chunk_size=500):
    if not ok:
        print(f"导入失败: {response}")

错误处理

from elasticsearch import Elasticsearch, NotFoundError, RequestError

es = Elasticsearch("http://localhost:9200")

try:
    doc = es.get(index="products", id=999)
except NotFoundError:
    print("文档不存在")
except RequestError as e:
    print(f"请求错误: {e}")
except Exception as e:
    print(f"其他错误: {e}")

完整示例

from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
import json

class ProductSearch:
    def __init__(self, hosts="http://localhost:9200"):
        self.es = Elasticsearch(hosts)
        self.index = "products"

    def create_index(self):
        """创建索引"""
        mapping = {
            "mappings": {
                "properties": {
                    "name": {"type": "text", "analyzer": "ik_max_word"},
                    "price": {"type": "float"},
                    "category": {"type": "keyword"},
                    "brand": {"type": "keyword"},
                    "stock": {"type": "integer"},
                    "is_available": {"type": "boolean"},
                    "created_at": {"type": "date"}
                }
            }
        }
        if not self.es.indices.exists(index=self.index):
            self.es.indices.create(index=self.index, body=mapping)

    def index_product(self, product):
        """索引商品"""
        return self.es.index(index=self.index, document=product)

    def bulk_index(self, products):
        """批量索引"""
        actions = [
            {"_index": self.index, "_source": p}
            for p in products
        ]
        return bulk(self.es, actions)

    def search(self, query, filters=None, size=10):
        """搜索商品"""
        bool_query = {"must": [{"match": {"name": query}}]}

        if filters:
            bool_query["filter"] = filters

        response = self.es.search(
            index=self.index,
            query={"bool": bool_query},
            size=size
        )
        return [hit["_source"] for hit in response["hits"]["hits"]]

    def search_by_price_range(self, min_price, max_price):
        """按价格范围搜索"""
        response = self.es.search(
            index=self.index,
            query={
                "range": {
                    "price": {
                        "gte": min_price,
                        "lte": max_price
                    }
                }
            }
        )
        return [hit["_source"] for hit in response["hits"]["hits"]]

    def get_stats(self):
        """获取统计信息"""
        response = self.es.search(
            index=self.index,
            size=0,
            aggs={
                "avg_price": {"avg": {"field": "price"}},
                "brands": {"terms": {"field": "brand", "size": 10}}
            }
        )
        return response["aggregations"]

# 使用示例
if __name__ == "__main__":
    search = ProductSearch()
    search.create_index()

    # 索引商品
    search.index_product({
        "name": "iPhone 15 Pro Max",
        "price": 9999,
        "category": "手机",
        "brand": "Apple",
        "stock": 100,
        "is_available": True
    })

    # 搜索
    results = search.search("iPhone", filters=[
        {"range": {"price": {"lte": 10000}}}
    ])
    print(json.dumps(results, ensure_ascii=False, indent=2))

小结

本章介绍了 Elasticsearch Python 客户端的使用:

  • 连接配置:认证、连接池、超时设置
  • 索引操作:创建、检查、删除
  • 文档操作:CRUD、批量操作
  • 查询操作:全文、精确、复合查询
  • 聚合操作:统计、分组、范围聚合
  • 异步客户端:async/await 支持
  • 错误处理:异常捕获

下一章我们将学习聚合分析。