第六章:Python 集成¶
PyMongo 安装¶
# 安装 PyMongo
pip install pymongo
# 安装带加密支持的版本
pip install "pymongo[snappy,gssapi,srv,tls]"
# 异步驱动
pip install motor
连接 MongoDB¶
基本连接¶
from pymongo import MongoClient
# 本地连接
client = MongoClient('localhost', 27017)
# 连接字符串
client = MongoClient('mongodb://localhost:27017/')
# 认证连接
client = MongoClient(
'mongodb://admin:password@localhost:27017/',
authSource='admin'
)
# 副本集连接
client = MongoClient(
'mongodb://host1:27017,host2:27017,host3:27017/',
replicaSet='rs0'
)
# DNS Seedlist 连接
client = MongoClient('mongodb+srv://cluster.example.com/mydb')
# 获取数据库
db = client['mydb']
# 或
db = client.mydb
# 获取集合
collection = db['users']
连接选项¶
client = MongoClient(
'mongodb://localhost:27017/',
maxPoolSize=100, # 连接池大小
minPoolSize=10,
connectTimeoutMS=5000, # 连接超时
socketTimeoutMS=30000, # Socket 超时
serverSelectionTimeoutMS=5000,
retryWrites=True, # 重试写入
retryReads=True,
w='majority', # 写关注
readPreference='secondaryPreferred' # 读偏好
)
连接上下文管理¶
from pymongo import MongoClient
# 使用上下文管理器
with MongoClient('mongodb://localhost:27017/') as client:
db = client.mydb
# 操作数据库
pass
# 自动关闭连接
CRUD 操作¶
插入文档¶
# 插入单个文档
result = db.users.insert_one({
'name': '张三',
'email': 'zhangsan@example.com',
'age': 25
})
print(f'插入 ID: {result.inserted_id}')
# 插入多个文档
users = [
{'name': '李四', 'email': 'lisi@example.com', 'age': 30},
{'name': '王五', 'email': 'wangwu@example.com', 'age': 28},
{'name': '赵六', 'email': 'zhaoliu@example.com', 'age': 35}
]
result = db.users.insert_many(users)
print(f'插入 {len(result.inserted_ids)} 个文档')
查询文档¶
# 查询单个文档
user = db.users.find_one({'name': '张三'})
print(user)
# 查询所有文档
for user in db.users.find():
print(user['name'])
# 条件查询
from bson.objectid import ObjectId
from datetime import datetime
# 比较查询
users = db.users.find({'age': {'$gte': 25, '$lte': 35}})
# 逻辑查询
users = db.users.find({
'$or': [
{'department': '研发部'},
{'age': {'$lt': 30}}
]
})
# 数组查询
users = db.users.find({'tags': 'python'})
# 正则查询
import re
users = db.users.find({'name': re.compile(r'^张')})
# 投影
users = db.users.find(
{'age': {'$gte': 25}},
{'name': 1, 'email': 1, '_id': 0}
)
# 排序和分页
users = db.users.find({}).sort('age', -1).skip(10).limit(10)
# 计数
count = db.users.count_documents({'department': '研发部'})
更新文档¶
# 更新单个文档
result = db.users.update_one(
{'name': '张三'},
{'$set': {'age': 26, 'department': '技术部'}}
)
print(f'匹配: {result.matched_count}, 修改: {result.modified_count}')
# 更新多个文档
result = db.users.update_many(
{'department': '研发部'},
{'$set': {'bonus': 1000}}
)
# 替换文档
result = db.users.replace_one(
{'name': '张三'},
{'name': '张三', 'email': 'zhangsan_new@example.com', 'age': 26}
)
# upsert
result = db.users.update_one(
{'name': '新用户'},
{'$set': {'email': 'new@example.com', 'age': 20}},
upsert=True
)
# 数组操作
db.users.update_one(
{'name': '张三'},
{'$push': {'tags': 'mongodb'}}
)
db.users.update_one(
{'name': '张三'},
{'$addToSet': {'tags': 'python'}} # 不重复添加
)
db.users.update_one(
{'name': '张三'},
{'$pull': {'tags': 'java'}}
)
删除文档¶
# 删除单个文档
result = db.users.delete_one({'name': '张三'})
print(f'删除: {result.deleted_count}')
# 删除多个文档
result = db.users.delete_many({'department': '研发部'})
# 删除所有文档
result = db.users.delete_many({})
聚合查询¶
# 基本聚合
pipeline = [
{'$match': {'status': 'completed'}},
{'$group': {
'_id': '$department',
'total': {'$sum': '$amount'},
'count': {'$sum': 1},
'avg': {'$avg': '$amount'}
}},
{'$sort': {'total': -1}},
{'$limit': 10}
]
results = db.orders.aggregate(pipeline)
for doc in results:
print(doc)
# 关联查询
pipeline = [
{
'$lookup': {
'from': 'users',
'localField': 'user_id',
'foreignField': '_id',
'as': 'user'
}
},
{'$unwind': '$user'},
{
'$project': {
'order_id': 1,
'amount': 1,
'user_name': '$user.name'
}
}
]
批量操作¶
from pymongo import UpdateOne, InsertOne, DeleteOne
# 批量写入
operations = [
InsertOne({'name': '用户1', 'age': 20}),
InsertOne({'name': '用户2', 'age': 22}),
UpdateOne({'name': '张三'}, {'$set': {'age': 26}}),
DeleteOne({'name': '已删除用户'})
]
result = db.users.bulk_write(operations)
print(f'插入: {result.inserted_count}')
print(f'更新: {result.modified_count}')
print(f'删除: {result.deleted_count}')
# 无序执行
result = db.users.bulk_write(operations, ordered=False)
索引操作¶
# 创建索引
from pymongo import ASCENDING, DESCENDING, TEXT, GEO2D, GEO2DSPHERE
# 单字段索引
db.users.create_index([('name', ASCENDING)])
# 复合索引
db.users.create_index([
('department', ASCENDING),
('age', DESCENDING)
])
# 唯一索引
db.users.create_index([('email', ASCENDING)], unique=True)
# 文本索引
db.articles.create_index([('title', TEXT), ('content', TEXT)])
# 地理空间索引
db.places.create_index([('location', GEO2DSPHERE)])
# TTL 索引
db.sessions.create_index(
[('created_at', ASCENDING)],
expireAfterSeconds=3600
)
# 部分索引
db.orders.create_index(
[('status', ASCENDING)],
partialFilterExpression={'status': 'completed'}
)
# 查看索引
for index in db.users.list_indexes():
print(index)
# 删除索引
db.users.drop_index('name_1')
事务支持¶
from pymongo import MongoClient
client = MongoClient('mongodb://localhost:27017/?replicaSet=rs0')
# 使用 with 语句
with client.start_session() as session:
with session.start_transaction():
db.accounts.update_one(
{'name': '账户A'},
{'$inc': {'balance': -100}},
session=session
)
db.accounts.update_one(
{'name': '账户B'},
{'$inc': {'balance': 100}},
session=session
)
# 手动控制
session = client.start_session()
session.start_transaction()
try:
db.orders.insert_one({'order_id': '001'}, session=session)
db.inventory.update_one(
{'product_id': 'P001'},
{'$inc': {'stock': -1}},
session=session
)
session.commit_transaction()
except Exception as e:
session.abort_transaction()
print(f'事务回滚: {e}')
finally:
session.end_session()
变更流¶
# 监听集合变更
with db.users.watch() as stream:
for change in stream:
print(f'操作类型: {change["operationType"]}')
print(f'文档: {change["fullDocument"]}')
# 带过滤条件
pipeline = [
{'$match': {'operationType': {'$in': ['insert', 'update']}}}
]
with db.users.watch(pipeline) as stream:
for change in stream:
print(change)
# 从特定时间点开始
from bson.timestamp import Timestamp
resume_token = None
# ... 保存 resume_token 用于恢复
异步操作 (Motor)¶
# 安装: pip install motor
from motor.motor_asyncio import AsyncIOMotorClient
import asyncio
async def main():
# 连接
client = AsyncIOMotorClient('mongodb://localhost:27017/')
db = client.mydb
# 插入
result = await db.users.insert_one({
'name': '张三',
'email': 'zhangsan@example.com'
})
print(f'插入 ID: {result.inserted_id}')
# 查询
user = await db.users.find_one({'name': '张三'})
print(user)
# 批量查询
async for user in db.users.find({'age': {'$gte': 25}}):
print(user['name'])
# 聚合
async for doc in db.orders.aggregate([
{'$group': {'_id': '$status', 'count': {'$sum': 1}}}
]):
print(doc)
asyncio.run(main())
ODM: MongoEngine¶
# 安装: pip install mongoengine
from mongoengine import connect, Document, StringField, IntField, DateTimeField
from datetime import datetime
# 连接
connect('mydb', host='mongodb://localhost:27017')
# 定义模型
class User(Document):
name = StringField(required=True, max_length=100)
email = StringField(required=True, unique=True)
age = IntField(min_value=0)
created_at = DateTimeField(default=datetime.now)
meta = {
'collection': 'users',
'indexes': ['name', 'email']
}
# 创建文档
user = User(
name='张三',
email='zhangsan@example.com',
age=25
)
user.save()
# 查询
users = User.objects(age__gte=25)
for user in users:
print(user.name, user.email)
# 更新
User.objects(name='张三').update(age=26)
# 删除
User.objects(name='张三').delete()
最佳实践¶
连接管理¶
# 单例模式
class MongoDB:
_client = None
@classmethod
def get_client(cls):
if cls._client is None:
cls._client = MongoClient(
'mongodb://localhost:27017/',
maxPoolSize=100
)
return cls._client
@classmethod
def get_db(cls):
return cls.get_client()['mydb']
# 使用
db = MongoDB.get_db()
错误处理¶
from pymongo.errors import (
ConnectionFailure,
OperationFailure,
DuplicateKeyError,
BulkWriteError
)
try:
db.users.insert_one({'email': 'existing@example.com'})
except DuplicateKeyError:
print('邮箱已存在')
except ConnectionFailure:
print('连接失败')
except OperationFailure as e:
print(f'操作失败: {e.details}')
性能优化¶
# 批量操作代替循环
# 差
for user in users:
db.users.insert_one(user)
# 好
db.users.insert_many(users)
# 使用投影减少数据传输
db.users.find({}, {'name': 1, 'email': 1, '_id': 0})
# 使用批量写入
operations = [UpdateOne(...) for ...]
db.users.bulk_write(operations)
小结¶
本章学习了:
- ✅ PyMongo 连接和配置
- ✅ CRUD 操作
- ✅ 聚合查询
- ✅ 批量操作
- ✅ 索引操作
- ✅ 事务支持
- ✅ 变更流
- ✅ 异步操作 (Motor)
- ✅ ODM (MongoEngine)
下一章¶
第七章:复制集 - 学习 MongoDB 高可用复制集部署。