跳转至

第四章:聚合框架

聚合管道概述

MongoDB 聚合框架(Aggregation Pipeline)是一种强大的数据处理工具,类似于 Linux 管道,将多个阶段串联起来处理数据。

db.collection.aggregate([
    { stage1 },
    { stage2 },
    { stage3 }
])

常用阶段

$match - 过滤

// 筛选条件
db.orders.aggregate([
    { $match: { status: "completed" } }
])

// 多条件筛选
db.orders.aggregate([
    { 
        $match: { 
            status: "completed",
            amount: { $gt: 100 }
        } 
    }
])

// 放在管道开头可以利用索引
db.orders.aggregate([
    { $match: { created_at: { $gte: ISODate("2024-01-01") } } },
    { $group: { _id: "$status", total: { $sum: "$amount" } } }
])

$project - 投影

// 选择字段
db.users.aggregate([
    {
        $project: {
            name: 1,
            email: 1,
            _id: 0
        }
    }
])

// 重命名字段
db.users.aggregate([
    {
        $project: {
            username: "$name",
            userEmail: "$email"
        }
    }
])

// 计算字段
db.orders.aggregate([
    {
        $project: {
            item: 1,
            total: { $multiply: ["$price", "$quantity"] }
        }
    }
])

// 嵌套字段
db.users.aggregate([
    {
        $project: {
            name: 1,
            city: "$address.city"
        }
    }
])

$group - 分组

// 基本分组
db.orders.aggregate([
    {
        $group: {
            _id: "$status",
            count: { $sum: 1 }
        }
    }
])

// 多字段分组
db.orders.aggregate([
    {
        $group: {
            _id: { 
                year: { $year: "$created_at" },
                month: { $month: "$created_at" }
            },
            total: { $sum: "$amount" }
        }
    }
])

// 聚合函数
db.orders.aggregate([
    {
        $group: {
            _id: "$department",
            totalAmount: { $sum: "$amount" },
            avgAmount: { $avg: "$amount" },
            minAmount: { $min: "$amount" },
            maxAmount: { $max: "$amount" },
            firstOrder: { $first: "$order_id" },
            lastOrder: { $last: "$order_id" },
            uniqueItems: { $addToSet: "$item" },
            allItems: { $push: "$item" }
        }
    }
])

$sort - 排序

// 升序
db.users.aggregate([
    { $sort: { age: 1 } }
])

// 降序
db.users.aggregate([
    { $sort: { created_at: -1 } }
])

// 多字段排序
db.users.aggregate([
    { $sort: { department: 1, age: -1 } }
])

// 与 $limit 配合使用
db.users.aggregate([
    { $sort: { score: -1 } },
    { $limit: 10 }  // Top 10
])

$limit 和 $skip - 分页

// 分页
db.users.aggregate([
    { $sort: { created_at: -1 } },
    { $skip: 20 },  // 跳过前 20 条
    { $limit: 10 }  // 取 10 条
])

$unwind - 展开数组

// 展开数组
db.orders.aggregate([
    { $unwind: "$items" }
])

// 保留空数组
db.orders.aggregate([
    { 
        $unwind: { 
            path: "$items", 
            preserveNullAndEmptyArrays: true 
        } 
    }
])

// 包含索引
db.orders.aggregate([
    { 
        $unwind: { 
            path: "$tags", 
            includeArrayIndex: "index" 
        } 
    }
])

$lookup - 关联查询

// 左连接
db.orders.aggregate([
    {
        $lookup: {
            from: "users",
            localField: "user_id",
            foreignField: "_id",
            as: "user_info"
        }
    }
])

// 嵌套展开
db.orders.aggregate([
    {
        $lookup: {
            from: "users",
            localField: "user_id",
            foreignField: "_id",
            as: "user_info"
        }
    },
    { $unwind: "$user_info" },
    {
        $project: {
            order_id: 1,
            amount: 1,
            user_name: "$user_info.name"
        }
    }
])

// 管道关联(MongoDB 3.6+)
db.orders.aggregate([
    {
        $lookup: {
            from: "users",
            let: { userId: "$user_id" },
            pipeline: [
                { $match: { $expr: { $eq: ["$_id", "$$userId"] } } },
                { $project: { name: 1, email: 1 } }
            ],
            as: "user_info"
        }
    }
])

$addFields - 添加字段

db.orders.aggregate([
    {
        $addFields: {
            total: { $multiply: ["$price", "$quantity"] },
            year: { $year: "$created_at" }
        }
    }
])

$replaceRoot - 替换根文档

db.orders.aggregate([
    {
        $replaceRoot: { newRoot: "$details" }
    }
])

$count - 计数

db.users.aggregate([
    { $match: { status: "active" } },
    { $count: "active_users" }
])

$facet - 多管道并行

db.products.aggregate([
    {
        $facet: {
            byCategory: [
                { $group: { _id: "$category", count: { $sum: 1 } } }
            ],
            byPrice: [
                { 
                    $bucket: {
                        groupBy: "$price",
                        boundaries: [0, 100, 500, 1000, Infinity],
                        output: { count: { $sum: 1 } }
                    }
                }
            ],
            totalProducts: [
                { $count: "count" }
            ]
        }
    }
])

聚合表达式

算术表达式

{
    $add: ["$price", 10] }              // 加法
    $subtract: ["$price", 5] }          // 减法
    $multiply: ["$price", "$quantity"] } // 乘法
    $divide: ["$total", 2] }            // 除法
    $mod: ["$number", 2] }              // 取模
    $abs: "$number" }                    // 绝对值
    $ceil: "$number" }                   // 向上取整
    $floor: "$number" }                  // 向下取整
    $round: ["$number", 2] }            // 四舍五入
}

字符串表达式

{
    $concat: ["$firstName", " ", "$lastName"] }  // 拼接
    $substr: ["$name", 0, 3] }                    // 子串
    $toLower: "$name" }                           // 转小写
    $toUpper: "$name" }                           // 转大写
    $trim: { input: "$name" } }                   // 去空格
    $split: ["$tags", ","] }                      // 分割
    $strLenCP: "$name" }                          // 字符串长度
    $regexMatch: { input: "$email", regex: /@/ } } // 正则匹配
}

日期表达式

{
    $year: "$created_at" }         // 年
    $month: "$created_at" }        // 月
    $dayOfMonth: "$created_at" }   // 日
    $hour: "$created_at" }         // 时
    $minute: "$created_at" }       // 分
    $second: "$created_at" }       // 秒
    $dayOfWeek: "$created_at" }    // 星期几 (1-7)
    $dayOfYear: "$created_at" }    // 一年中第几天
    $week: "$created_at" }         // 一年中第几周
    $dateToString: { format: "%Y-%m-%d", date: "$created_at" } }
}

条件表达式

// $cond
{
    $cond: {
        if: { $gte: ["$score", 60] },
        then: "pass",
        else: "fail"
    }
}

// $ifNull
{
    $ifNull: ["$nickname", "$username"]
}

// $switch
{
    $switch: {
        branches: [
            { case: { $eq: ["$status", "pending"] }, then: "待处理" },
            { case: { $eq: ["$status", "processing"] }, then: "处理中" },
            { case: { $eq: ["$status", "completed"] }, then: "已完成" }
        ],
        default: "未知状态"
    }
}

数组表达式

{
    $size: "$tags" }                    // 数组长度
    $arrayElemAt: ["$items", 0] }       // 获取元素
    $first: "$items" }                  // 第一个元素
    $last: "$items" }                   // 最后一个元素
    $reverseArray: "$items" }           // 反转数组
    $sortArray: { input: "$items", sortBy: 1 } }  // 排序
    $slice: ["$items", 0, 5] }          // 切片
    $concatArrays: ["$arr1", "$arr2"] } // 合并数组
    $setUnion: ["$arr1", "$arr2"] }     // 并集
    $setIntersection: ["$arr1", "$arr2"] } // 交集
    $setDifference: ["$arr1", "$arr2"] }   // 差集
}

实战案例

销售报表

// 每月销售统计
db.orders.aggregate([
    { $match: { status: "completed" } },
    {
        $group: {
            _id: {
                year: { $year: "$created_at" },
                month: { $month: "$created_at" }
            },
            totalSales: { $sum: "$amount" },
            orderCount: { $sum: 1 },
            avgOrderValue: { $avg: "$amount" }
        }
    },
    { $sort: { "_id.year": -1, "_id.month": -1 } }
])

用户活跃度分析

// 用户登录统计
db.login_logs.aggregate([
    {
        $group: {
            _id: "$user_id",
            loginCount: { $sum: 1 },
            lastLogin: { $max: "$login_time" },
            firstLogin: { $min: "$login_time" }
        }
    },
    {
        $lookup: {
            from: "users",
            localField: "_id",
            foreignField: "_id",
            as: "user"
        }
    },
    { $unwind: "$user" },
    {
        $project: {
            username: "$user.name",
            loginCount: 1,
            lastLogin: 1,
            daysSinceFirstLogin: {
                $divide: [
                    { $subtract: ["$lastLogin", "$firstLogin"] },
                    86400000  // 毫秒转天
                ]
            }
        }
    }
])

商品推荐

// 基于购买记录推荐
db.orders.aggregate([
    { $match: { user_id: ObjectId("...") } },
    { $unwind: "$items" },
    {
        $group: {
            _id: "$items.category",
            purchaseCount: { $sum: 1 }
        }
    },
    { $sort: { purchaseCount: -1 } },
    { $limit: 3 },
    {
        $lookup: {
            from: "products",
            localField: "_id",
            foreignField: "category",
            as: "recommended"
        }
    },
    { $unwind: "$recommended" },
    {
        $project: {
            category: "$_id",
            product: "$recommended.name",
            price: "$recommended.price"
        }
    }
])

性能优化

使用索引

// 确保 $match 使用索引
db.orders.createIndex({ status: 1, created_at: -1 })

db.orders.aggregate([
    { $match: { status: "completed" } },  // 使用索引
    // ... 其他阶段
])

早期过滤

// 好:早期过滤
db.orders.aggregate([
    { $match: { status: "completed" } },  // 先过滤
    { $unwind: "$items" },                // 再展开
    { $group: { _id: "$items.category", total: { $sum: "$items.price" } } }
])

// 差:后期过滤
db.orders.aggregate([
    { $unwind: "$items" },                // 先展开所有
    { $match: { status: "completed" } },  // 后过滤
    { $group: { _id: "$items.category", total: { $sum: "$items.price" } } }
])

allowDiskUse

// 大数据量聚合
db.large_collection.aggregate([
    // ... 复杂聚合
], { allowDiskUse: true })

小结

本章学习了:

  • ✅ 聚合管道概念
  • ✅ 常用阶段(\(match、\)group、$project 等)
  • ✅ 聚合表达式
  • ✅ 实战案例
  • ✅ 性能优化

下一章

第五章:索引优化 - 学习 MongoDB 索引设计和优化。