第四章:聚合框架¶
聚合管道概述¶
MongoDB 聚合框架(Aggregation Pipeline)是一种强大的数据处理工具,类似于 Linux 管道,将多个阶段串联起来处理数据。
常用阶段¶
$match - 过滤¶
// 筛选条件
db.orders.aggregate([
{ $match: { status: "completed" } }
])
// 多条件筛选
db.orders.aggregate([
{
$match: {
status: "completed",
amount: { $gt: 100 }
}
}
])
// 放在管道开头可以利用索引
db.orders.aggregate([
{ $match: { created_at: { $gte: ISODate("2024-01-01") } } },
{ $group: { _id: "$status", total: { $sum: "$amount" } } }
])
$project - 投影¶
// 选择字段
db.users.aggregate([
{
$project: {
name: 1,
email: 1,
_id: 0
}
}
])
// 重命名字段
db.users.aggregate([
{
$project: {
username: "$name",
userEmail: "$email"
}
}
])
// 计算字段
db.orders.aggregate([
{
$project: {
item: 1,
total: { $multiply: ["$price", "$quantity"] }
}
}
])
// 嵌套字段
db.users.aggregate([
{
$project: {
name: 1,
city: "$address.city"
}
}
])
$group - 分组¶
// 基本分组
db.orders.aggregate([
{
$group: {
_id: "$status",
count: { $sum: 1 }
}
}
])
// 多字段分组
db.orders.aggregate([
{
$group: {
_id: {
year: { $year: "$created_at" },
month: { $month: "$created_at" }
},
total: { $sum: "$amount" }
}
}
])
// 聚合函数
db.orders.aggregate([
{
$group: {
_id: "$department",
totalAmount: { $sum: "$amount" },
avgAmount: { $avg: "$amount" },
minAmount: { $min: "$amount" },
maxAmount: { $max: "$amount" },
firstOrder: { $first: "$order_id" },
lastOrder: { $last: "$order_id" },
uniqueItems: { $addToSet: "$item" },
allItems: { $push: "$item" }
}
}
])
$sort - 排序¶
// 升序
db.users.aggregate([
{ $sort: { age: 1 } }
])
// 降序
db.users.aggregate([
{ $sort: { created_at: -1 } }
])
// 多字段排序
db.users.aggregate([
{ $sort: { department: 1, age: -1 } }
])
// 与 $limit 配合使用
db.users.aggregate([
{ $sort: { score: -1 } },
{ $limit: 10 } // Top 10
])
$limit 和 $skip - 分页¶
// 分页
db.users.aggregate([
{ $sort: { created_at: -1 } },
{ $skip: 20 }, // 跳过前 20 条
{ $limit: 10 } // 取 10 条
])
$unwind - 展开数组¶
// 展开数组
db.orders.aggregate([
{ $unwind: "$items" }
])
// 保留空数组
db.orders.aggregate([
{
$unwind: {
path: "$items",
preserveNullAndEmptyArrays: true
}
}
])
// 包含索引
db.orders.aggregate([
{
$unwind: {
path: "$tags",
includeArrayIndex: "index"
}
}
])
$lookup - 关联查询¶
// 左连接
db.orders.aggregate([
{
$lookup: {
from: "users",
localField: "user_id",
foreignField: "_id",
as: "user_info"
}
}
])
// 嵌套展开
db.orders.aggregate([
{
$lookup: {
from: "users",
localField: "user_id",
foreignField: "_id",
as: "user_info"
}
},
{ $unwind: "$user_info" },
{
$project: {
order_id: 1,
amount: 1,
user_name: "$user_info.name"
}
}
])
// 管道关联(MongoDB 3.6+)
db.orders.aggregate([
{
$lookup: {
from: "users",
let: { userId: "$user_id" },
pipeline: [
{ $match: { $expr: { $eq: ["$_id", "$$userId"] } } },
{ $project: { name: 1, email: 1 } }
],
as: "user_info"
}
}
])
$addFields - 添加字段¶
db.orders.aggregate([
{
$addFields: {
total: { $multiply: ["$price", "$quantity"] },
year: { $year: "$created_at" }
}
}
])
$replaceRoot - 替换根文档¶
$count - 计数¶
$facet - 多管道并行¶
db.products.aggregate([
{
$facet: {
byCategory: [
{ $group: { _id: "$category", count: { $sum: 1 } } }
],
byPrice: [
{
$bucket: {
groupBy: "$price",
boundaries: [0, 100, 500, 1000, Infinity],
output: { count: { $sum: 1 } }
}
}
],
totalProducts: [
{ $count: "count" }
]
}
}
])
聚合表达式¶
算术表达式¶
{
$add: ["$price", 10] } // 加法
$subtract: ["$price", 5] } // 减法
$multiply: ["$price", "$quantity"] } // 乘法
$divide: ["$total", 2] } // 除法
$mod: ["$number", 2] } // 取模
$abs: "$number" } // 绝对值
$ceil: "$number" } // 向上取整
$floor: "$number" } // 向下取整
$round: ["$number", 2] } // 四舍五入
}
字符串表达式¶
{
$concat: ["$firstName", " ", "$lastName"] } // 拼接
$substr: ["$name", 0, 3] } // 子串
$toLower: "$name" } // 转小写
$toUpper: "$name" } // 转大写
$trim: { input: "$name" } } // 去空格
$split: ["$tags", ","] } // 分割
$strLenCP: "$name" } // 字符串长度
$regexMatch: { input: "$email", regex: /@/ } } // 正则匹配
}
日期表达式¶
{
$year: "$created_at" } // 年
$month: "$created_at" } // 月
$dayOfMonth: "$created_at" } // 日
$hour: "$created_at" } // 时
$minute: "$created_at" } // 分
$second: "$created_at" } // 秒
$dayOfWeek: "$created_at" } // 星期几 (1-7)
$dayOfYear: "$created_at" } // 一年中第几天
$week: "$created_at" } // 一年中第几周
$dateToString: { format: "%Y-%m-%d", date: "$created_at" } }
}
条件表达式¶
// $cond
{
$cond: {
if: { $gte: ["$score", 60] },
then: "pass",
else: "fail"
}
}
// $ifNull
{
$ifNull: ["$nickname", "$username"]
}
// $switch
{
$switch: {
branches: [
{ case: { $eq: ["$status", "pending"] }, then: "待处理" },
{ case: { $eq: ["$status", "processing"] }, then: "处理中" },
{ case: { $eq: ["$status", "completed"] }, then: "已完成" }
],
default: "未知状态"
}
}
数组表达式¶
{
$size: "$tags" } // 数组长度
$arrayElemAt: ["$items", 0] } // 获取元素
$first: "$items" } // 第一个元素
$last: "$items" } // 最后一个元素
$reverseArray: "$items" } // 反转数组
$sortArray: { input: "$items", sortBy: 1 } } // 排序
$slice: ["$items", 0, 5] } // 切片
$concatArrays: ["$arr1", "$arr2"] } // 合并数组
$setUnion: ["$arr1", "$arr2"] } // 并集
$setIntersection: ["$arr1", "$arr2"] } // 交集
$setDifference: ["$arr1", "$arr2"] } // 差集
}
实战案例¶
销售报表¶
// 每月销售统计
db.orders.aggregate([
{ $match: { status: "completed" } },
{
$group: {
_id: {
year: { $year: "$created_at" },
month: { $month: "$created_at" }
},
totalSales: { $sum: "$amount" },
orderCount: { $sum: 1 },
avgOrderValue: { $avg: "$amount" }
}
},
{ $sort: { "_id.year": -1, "_id.month": -1 } }
])
用户活跃度分析¶
// 用户登录统计
db.login_logs.aggregate([
{
$group: {
_id: "$user_id",
loginCount: { $sum: 1 },
lastLogin: { $max: "$login_time" },
firstLogin: { $min: "$login_time" }
}
},
{
$lookup: {
from: "users",
localField: "_id",
foreignField: "_id",
as: "user"
}
},
{ $unwind: "$user" },
{
$project: {
username: "$user.name",
loginCount: 1,
lastLogin: 1,
daysSinceFirstLogin: {
$divide: [
{ $subtract: ["$lastLogin", "$firstLogin"] },
86400000 // 毫秒转天
]
}
}
}
])
商品推荐¶
// 基于购买记录推荐
db.orders.aggregate([
{ $match: { user_id: ObjectId("...") } },
{ $unwind: "$items" },
{
$group: {
_id: "$items.category",
purchaseCount: { $sum: 1 }
}
},
{ $sort: { purchaseCount: -1 } },
{ $limit: 3 },
{
$lookup: {
from: "products",
localField: "_id",
foreignField: "category",
as: "recommended"
}
},
{ $unwind: "$recommended" },
{
$project: {
category: "$_id",
product: "$recommended.name",
price: "$recommended.price"
}
}
])
性能优化¶
使用索引¶
// 确保 $match 使用索引
db.orders.createIndex({ status: 1, created_at: -1 })
db.orders.aggregate([
{ $match: { status: "completed" } }, // 使用索引
// ... 其他阶段
])
早期过滤¶
// 好:早期过滤
db.orders.aggregate([
{ $match: { status: "completed" } }, // 先过滤
{ $unwind: "$items" }, // 再展开
{ $group: { _id: "$items.category", total: { $sum: "$items.price" } } }
])
// 差:后期过滤
db.orders.aggregate([
{ $unwind: "$items" }, // 先展开所有
{ $match: { status: "completed" } }, // 后过滤
{ $group: { _id: "$items.category", total: { $sum: "$items.price" } } }
])
allowDiskUse¶
小结¶
本章学习了:
- ✅ 聚合管道概念
- ✅ 常用阶段(\(match、\)group、$project 等)
- ✅ 聚合表达式
- ✅ 实战案例
- ✅ 性能优化
下一章¶
第五章:索引优化 - 学习 MongoDB 索引设计和优化。