跳转至

第六章:Python 集成

PyMongo 安装

# 安装 PyMongo
pip install pymongo

# 安装带加密支持的版本
pip install "pymongo[snappy,gssapi,srv,tls]"

# 异步驱动
pip install motor

连接 MongoDB

基本连接

from pymongo import MongoClient

# 本地连接
client = MongoClient('localhost', 27017)

# 连接字符串
client = MongoClient('mongodb://localhost:27017/')

# 认证连接
client = MongoClient(
    'mongodb://admin:password@localhost:27017/',
    authSource='admin'
)

# 副本集连接
client = MongoClient(
    'mongodb://host1:27017,host2:27017,host3:27017/',
    replicaSet='rs0'
)

# DNS Seedlist 连接
client = MongoClient('mongodb+srv://cluster.example.com/mydb')

# 获取数据库
db = client['mydb']
# 或
db = client.mydb

# 获取集合
collection = db['users']

连接选项

client = MongoClient(
    'mongodb://localhost:27017/',
    maxPoolSize=100,           # 连接池大小
    minPoolSize=10,
    connectTimeoutMS=5000,     # 连接超时
    socketTimeoutMS=30000,     # Socket 超时
    serverSelectionTimeoutMS=5000,
    retryWrites=True,          # 重试写入
    retryReads=True,
    w='majority',              # 写关注
    readPreference='secondaryPreferred'  # 读偏好
)

连接上下文管理

from pymongo import MongoClient

# 使用上下文管理器
with MongoClient('mongodb://localhost:27017/') as client:
    db = client.mydb
    # 操作数据库
    pass
# 自动关闭连接

CRUD 操作

插入文档

# 插入单个文档
result = db.users.insert_one({
    'name': '张三',
    'email': 'zhangsan@example.com',
    'age': 25
})
print(f'插入 ID: {result.inserted_id}')

# 插入多个文档
users = [
    {'name': '李四', 'email': 'lisi@example.com', 'age': 30},
    {'name': '王五', 'email': 'wangwu@example.com', 'age': 28},
    {'name': '赵六', 'email': 'zhaoliu@example.com', 'age': 35}
]
result = db.users.insert_many(users)
print(f'插入 {len(result.inserted_ids)} 个文档')

查询文档

# 查询单个文档
user = db.users.find_one({'name': '张三'})
print(user)

# 查询所有文档
for user in db.users.find():
    print(user['name'])

# 条件查询
from bson.objectid import ObjectId
from datetime import datetime

# 比较查询
users = db.users.find({'age': {'$gte': 25, '$lte': 35}})

# 逻辑查询
users = db.users.find({
    '$or': [
        {'department': '研发部'},
        {'age': {'$lt': 30}}
    ]
})

# 数组查询
users = db.users.find({'tags': 'python'})

# 正则查询
import re
users = db.users.find({'name': re.compile(r'^张')})

# 投影
users = db.users.find(
    {'age': {'$gte': 25}},
    {'name': 1, 'email': 1, '_id': 0}
)

# 排序和分页
users = db.users.find({}).sort('age', -1).skip(10).limit(10)

# 计数
count = db.users.count_documents({'department': '研发部'})

更新文档

# 更新单个文档
result = db.users.update_one(
    {'name': '张三'},
    {'$set': {'age': 26, 'department': '技术部'}}
)
print(f'匹配: {result.matched_count}, 修改: {result.modified_count}')

# 更新多个文档
result = db.users.update_many(
    {'department': '研发部'},
    {'$set': {'bonus': 1000}}
)

# 替换文档
result = db.users.replace_one(
    {'name': '张三'},
    {'name': '张三', 'email': 'zhangsan_new@example.com', 'age': 26}
)

# upsert
result = db.users.update_one(
    {'name': '新用户'},
    {'$set': {'email': 'new@example.com', 'age': 20}},
    upsert=True
)

# 数组操作
db.users.update_one(
    {'name': '张三'},
    {'$push': {'tags': 'mongodb'}}
)

db.users.update_one(
    {'name': '张三'},
    {'$addToSet': {'tags': 'python'}}  # 不重复添加
)

db.users.update_one(
    {'name': '张三'},
    {'$pull': {'tags': 'java'}}
)

删除文档

# 删除单个文档
result = db.users.delete_one({'name': '张三'})
print(f'删除: {result.deleted_count}')

# 删除多个文档
result = db.users.delete_many({'department': '研发部'})

# 删除所有文档
result = db.users.delete_many({})

聚合查询

# 基本聚合
pipeline = [
    {'$match': {'status': 'completed'}},
    {'$group': {
        '_id': '$department',
        'total': {'$sum': '$amount'},
        'count': {'$sum': 1},
        'avg': {'$avg': '$amount'}
    }},
    {'$sort': {'total': -1}},
    {'$limit': 10}
]

results = db.orders.aggregate(pipeline)
for doc in results:
    print(doc)

# 关联查询
pipeline = [
    {
        '$lookup': {
            'from': 'users',
            'localField': 'user_id',
            'foreignField': '_id',
            'as': 'user'
        }
    },
    {'$unwind': '$user'},
    {
        '$project': {
            'order_id': 1,
            'amount': 1,
            'user_name': '$user.name'
        }
    }
]

批量操作

from pymongo import UpdateOne, InsertOne, DeleteOne

# 批量写入
operations = [
    InsertOne({'name': '用户1', 'age': 20}),
    InsertOne({'name': '用户2', 'age': 22}),
    UpdateOne({'name': '张三'}, {'$set': {'age': 26}}),
    DeleteOne({'name': '已删除用户'})
]

result = db.users.bulk_write(operations)
print(f'插入: {result.inserted_count}')
print(f'更新: {result.modified_count}')
print(f'删除: {result.deleted_count}')

# 无序执行
result = db.users.bulk_write(operations, ordered=False)

索引操作

# 创建索引
from pymongo import ASCENDING, DESCENDING, TEXT, GEO2D, GEO2DSPHERE

# 单字段索引
db.users.create_index([('name', ASCENDING)])

# 复合索引
db.users.create_index([
    ('department', ASCENDING),
    ('age', DESCENDING)
])

# 唯一索引
db.users.create_index([('email', ASCENDING)], unique=True)

# 文本索引
db.articles.create_index([('title', TEXT), ('content', TEXT)])

# 地理空间索引
db.places.create_index([('location', GEO2DSPHERE)])

# TTL 索引
db.sessions.create_index(
    [('created_at', ASCENDING)],
    expireAfterSeconds=3600
)

# 部分索引
db.orders.create_index(
    [('status', ASCENDING)],
    partialFilterExpression={'status': 'completed'}
)

# 查看索引
for index in db.users.list_indexes():
    print(index)

# 删除索引
db.users.drop_index('name_1')

事务支持

from pymongo import MongoClient

client = MongoClient('mongodb://localhost:27017/?replicaSet=rs0')

# 使用 with 语句
with client.start_session() as session:
    with session.start_transaction():
        db.accounts.update_one(
            {'name': '账户A'},
            {'$inc': {'balance': -100}},
            session=session
        )
        db.accounts.update_one(
            {'name': '账户B'},
            {'$inc': {'balance': 100}},
            session=session
        )

# 手动控制
session = client.start_session()
session.start_transaction()
try:
    db.orders.insert_one({'order_id': '001'}, session=session)
    db.inventory.update_one(
        {'product_id': 'P001'},
        {'$inc': {'stock': -1}},
        session=session
    )
    session.commit_transaction()
except Exception as e:
    session.abort_transaction()
    print(f'事务回滚: {e}')
finally:
    session.end_session()

变更流

# 监听集合变更
with db.users.watch() as stream:
    for change in stream:
        print(f'操作类型: {change["operationType"]}')
        print(f'文档: {change["fullDocument"]}')

# 带过滤条件
pipeline = [
    {'$match': {'operationType': {'$in': ['insert', 'update']}}}
]
with db.users.watch(pipeline) as stream:
    for change in stream:
        print(change)

# 从特定时间点开始
from bson.timestamp import Timestamp
resume_token = None
# ... 保存 resume_token 用于恢复

异步操作 (Motor)

# 安装: pip install motor
from motor.motor_asyncio import AsyncIOMotorClient
import asyncio

async def main():
    # 连接
    client = AsyncIOMotorClient('mongodb://localhost:27017/')
    db = client.mydb

    # 插入
    result = await db.users.insert_one({
        'name': '张三',
        'email': 'zhangsan@example.com'
    })
    print(f'插入 ID: {result.inserted_id}')

    # 查询
    user = await db.users.find_one({'name': '张三'})
    print(user)

    # 批量查询
    async for user in db.users.find({'age': {'$gte': 25}}):
        print(user['name'])

    # 聚合
    async for doc in db.orders.aggregate([
        {'$group': {'_id': '$status', 'count': {'$sum': 1}}}
    ]):
        print(doc)

asyncio.run(main())

ODM: MongoEngine

# 安装: pip install mongoengine
from mongoengine import connect, Document, StringField, IntField, DateTimeField
from datetime import datetime

# 连接
connect('mydb', host='mongodb://localhost:27017')

# 定义模型
class User(Document):
    name = StringField(required=True, max_length=100)
    email = StringField(required=True, unique=True)
    age = IntField(min_value=0)
    created_at = DateTimeField(default=datetime.now)

    meta = {
        'collection': 'users',
        'indexes': ['name', 'email']
    }

# 创建文档
user = User(
    name='张三',
    email='zhangsan@example.com',
    age=25
)
user.save()

# 查询
users = User.objects(age__gte=25)
for user in users:
    print(user.name, user.email)

# 更新
User.objects(name='张三').update(age=26)

# 删除
User.objects(name='张三').delete()

最佳实践

连接管理

# 单例模式
class MongoDB:
    _client = None

    @classmethod
    def get_client(cls):
        if cls._client is None:
            cls._client = MongoClient(
                'mongodb://localhost:27017/',
                maxPoolSize=100
            )
        return cls._client

    @classmethod
    def get_db(cls):
        return cls.get_client()['mydb']

# 使用
db = MongoDB.get_db()

错误处理

from pymongo.errors import (
    ConnectionFailure,
    OperationFailure,
    DuplicateKeyError,
    BulkWriteError
)

try:
    db.users.insert_one({'email': 'existing@example.com'})
except DuplicateKeyError:
    print('邮箱已存在')
except ConnectionFailure:
    print('连接失败')
except OperationFailure as e:
    print(f'操作失败: {e.details}')

性能优化

# 批量操作代替循环
# 差
for user in users:
    db.users.insert_one(user)

# 好
db.users.insert_many(users)

# 使用投影减少数据传输
db.users.find({}, {'name': 1, 'email': 1, '_id': 0})

# 使用批量写入
operations = [UpdateOne(...) for ...]
db.users.bulk_write(operations)

小结

本章学习了:

  • ✅ PyMongo 连接和配置
  • ✅ CRUD 操作
  • ✅ 聚合查询
  • ✅ 批量操作
  • ✅ 索引操作
  • ✅ 事务支持
  • ✅ 变更流
  • ✅ 异步操作 (Motor)
  • ✅ ODM (MongoEngine)

下一章

第七章:复制集 - 学习 MongoDB 高可用复制集部署。