跳转至

第十章:实战项目

本章将通过一个完整的电商订单系统,综合应用 PostgreSQL 的各项特性。

10.1 项目需求

构建一个电商订单系统,需要实现:

  • 用户管理(注册、登录、权限)
  • 商品管理(分类、库存、价格)
  • 购物车功能
  • 订单管理(创建、支付、发货)
  • 数据统计与分析

10.2 数据库设计

ER 图

users ──┬── orders ──── order_items ──── products
        │       │
        │       └── payments
        └── cart_items ──── products

products ──── categories
        └── inventory_logs

表结构

-- 用户表
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    username VARCHAR(50) UNIQUE NOT NULL,
    email VARCHAR(255) UNIQUE NOT NULL,
    password_hash VARCHAR(255) NOT NULL,
    role VARCHAR(20) DEFAULT 'customer' CHECK (role IN ('customer', 'admin', 'staff')),
    is_active BOOLEAN DEFAULT true,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 商品分类表
CREATE TABLE categories (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    parent_id INTEGER REFERENCES categories(id),
    sort_order INTEGER DEFAULT 0,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 商品表
CREATE TABLE products (
    id SERIAL PRIMARY KEY,
    name VARCHAR(200) NOT NULL,
    description TEXT,
    price DECIMAL(10, 2) NOT NULL CHECK (price >= 0),
    cost_price DECIMAL(10, 2) CHECK (cost_price >= 0),
    stock INTEGER DEFAULT 0 CHECK (stock >= 0),
    category_id INTEGER REFERENCES categories(id),
    sku VARCHAR(50) UNIQUE,
    is_active BOOLEAN DEFAULT true,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 购物车表
CREATE TABLE cart_items (
    id SERIAL PRIMARY KEY,
    user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
    product_id INTEGER NOT NULL REFERENCES products(id),
    quantity INTEGER NOT NULL DEFAULT 1 CHECK (quantity > 0),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    UNIQUE (user_id, product_id)
);

-- 订单表
CREATE TABLE orders (
    id SERIAL PRIMARY KEY,
    order_no VARCHAR(50) UNIQUE NOT NULL,
    user_id INTEGER NOT NULL REFERENCES users(id),
    status VARCHAR(20) DEFAULT 'pending' CHECK (status IN ('pending', 'paid', 'shipped', 'delivered', 'cancelled')),
    total_amount DECIMAL(12, 2) NOT NULL,
    shipping_address TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 订单明细表
CREATE TABLE order_items (
    id SERIAL PRIMARY KEY,
    order_id INTEGER NOT NULL REFERENCES orders(id) ON DELETE CASCADE,
    product_id INTEGER NOT NULL REFERENCES products(id),
    product_name VARCHAR(200) NOT NULL,
    price DECIMAL(10, 2) NOT NULL,
    quantity INTEGER NOT NULL CHECK (quantity > 0),
    subtotal DECIMAL(12, 2) NOT NULL
);

-- 支付记录表
CREATE TABLE payments (
    id SERIAL PRIMARY KEY,
    order_id INTEGER NOT NULL REFERENCES orders(id),
    amount DECIMAL(12, 2) NOT NULL,
    payment_method VARCHAR(20) NOT NULL,
    status VARCHAR(20) DEFAULT 'pending' CHECK (status IN ('pending', 'success', 'failed', 'refunded')),
    transaction_id VARCHAR(100),
    paid_at TIMESTAMP,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 库存变更日志
CREATE TABLE inventory_logs (
    id SERIAL PRIMARY KEY,
    product_id INTEGER NOT NULL REFERENCES products(id),
    change_type VARCHAR(20) NOT NULL CHECK (change_type IN ('purchase', 'sale', 'adjustment', 'return')),
    quantity INTEGER NOT NULL,
    before_stock INTEGER NOT NULL,
    after_stock INTEGER NOT NULL,
    reference_id INTEGER,
    remark TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 创建索引
CREATE INDEX idx_products_category ON products(category_id);
CREATE INDEX idx_products_sku ON products(sku);
CREATE INDEX idx_orders_user ON orders(user_id);
CREATE INDEX idx_orders_status ON orders(status);
CREATE INDEX idx_orders_created ON orders(created_at);
CREATE INDEX idx_order_items_order ON order_items(order_id);
CREATE INDEX idx_payments_order ON payments(order_id);

10.3 触发器与函数

自动更新 updated_at

CREATE OR REPLACE FUNCTION update_updated_at()
RETURNS TRIGGER AS $$
BEGIN
    NEW.updated_at = CURRENT_TIMESTAMP;
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER users_updated_at
    BEFORE UPDATE ON users
    FOR EACH ROW EXECUTE FUNCTION update_updated_at();

CREATE TRIGGER products_updated_at
    BEFORE UPDATE ON products
    FOR EACH ROW EXECUTE FUNCTION update_updated_at();

CREATE TRIGGER orders_updated_at
    BEFORE UPDATE ON orders
    FOR EACH ROW EXECUTE FUNCTION update_updated_at();

库存管理

-- 扣减库存
CREATE OR REPLACE FUNCTION deduct_inventory(
    p_product_id INTEGER,
    p_quantity INTEGER,
    p_order_id INTEGER
) RETURNS BOOLEAN AS $$
DECLARE
    v_current_stock INTEGER;
BEGIN
    -- 获取当前库存并锁定
    SELECT stock INTO v_current_stock
    FROM products WHERE id = p_product_id
    FOR UPDATE;

    -- 检查库存
    IF v_current_stock < p_quantity THEN
        RAISE EXCEPTION '库存不足: 产品 % 当前库存 %,需要 %', 
            p_product_id, v_current_stock, p_quantity;
    END IF;

    -- 扣减库存
    UPDATE products 
    SET stock = stock - p_quantity 
    WHERE id = p_product_id;

    -- 记录日志
    INSERT INTO inventory_logs (product_id, change_type, quantity, before_stock, after_stock, reference_id)
    VALUES (p_product_id, 'sale', -p_quantity, v_current_stock, v_current_stock - p_quantity, p_order_id);

    RETURN true;
END;
$$ LANGUAGE plpgsql;

-- 增加库存
CREATE OR REPLACE FUNCTION add_inventory(
    p_product_id INTEGER,
    p_quantity INTEGER,
    p_change_type VARCHAR DEFAULT 'purchase',
    p_remark TEXT DEFAULT NULL
) RETURNS BOOLEAN AS $$
DECLARE
    v_current_stock INTEGER;
BEGIN
    SELECT stock INTO v_current_stock
    FROM products WHERE id = p_product_id
    FOR UPDATE;

    UPDATE products 
    SET stock = stock + p_quantity 
    WHERE id = p_product_id;

    INSERT INTO inventory_logs (product_id, change_type, quantity, before_stock, after_stock, remark)
    VALUES (p_product_id, p_change_type, p_quantity, v_current_stock, v_current_stock + p_quantity, p_remark);

    RETURN true;
END;
$$ LANGUAGE plpgsql;

订单号生成

CREATE OR REPLACE FUNCTION generate_order_no()
RETURNS VARCHAR AS $$
DECLARE
    v_date VARCHAR := to_char(CURRENT_DATE, 'YYYYMMDD');
    v_seq INTEGER;
    v_order_no VARCHAR;
BEGIN
    -- 获取当日序列号
    SELECT COALESCE(MAX(CAST(SUBSTRING(order_no FROM 9 FOR 6) AS INTEGER)), 0) + 1
    INTO v_seq
    FROM orders
    WHERE order_no LIKE v_date || '%';

    v_order_no := v_date || LPAD(v_seq::TEXT, 6, '0');
    RETURN v_order_no;
END;
$$ LANGUAGE plpgsql;

10.4 视图

-- 商品详情视图
CREATE VIEW product_details AS
SELECT 
    p.*,
    c.name AS category_name,
    (SELECT COUNT(*) FROM order_items WHERE product_id = p.id) AS sales_count
FROM products p
LEFT JOIN categories c ON p.category_id = c.id;

-- 订单详情视图
CREATE VIEW order_details AS
SELECT 
    o.*,
    u.username,
    u.email,
    (SELECT JSON_AGG(
        JSON_BUILD_OBJECT(
            'product_name', oi.product_name,
            'price', oi.price,
            'quantity', oi.quantity,
            'subtotal', oi.subtotal
        )
    ) FROM order_items oi WHERE oi.order_id = o.id) AS items
FROM orders o
JOIN users u ON o.user_id = u.id;

-- 销售统计视图
CREATE VIEW sales_stats AS
SELECT 
    DATE(created_at) AS sale_date,
    COUNT(*) AS order_count,
    SUM(total_amount) AS total_sales,
    AVG(total_amount) AS avg_order_amount
FROM orders
WHERE status != 'cancelled'
GROUP BY DATE(created_at);

10.5 存储过程

创建订单

CREATE OR REPLACE PROCEDURE create_order(
    p_user_id INTEGER,
    p_shipping_address TEXT,
    OUT p_order_id INTEGER,
    OUT p_order_no VARCHAR
)
LANGUAGE plpgsql
AS $$
DECLARE
    v_cart_item RECORD;
    v_total DECIMAL(12, 2) := 0;
    v_subtotal DECIMAL(12, 2);
BEGIN
    -- 生成订单号
    p_order_no := generate_order_no();

    -- 计算总价
    FOR v_cart_item IN 
        SELECT ci.product_id, ci.quantity, p.price, p.name
        FROM cart_items ci
        JOIN products p ON ci.product_id = p.id
        WHERE ci.user_id = p_user_id AND p.is_active = true
    LOOP
        -- 检查库存
        IF (SELECT stock FROM products WHERE id = v_cart_item.product_id) < v_cart_item.quantity THEN
            RAISE EXCEPTION '商品 % 库存不足', v_cart_item.name;
        END IF;

        v_total := v_total + v_cart_item.price * v_cart_item.quantity;
    END LOOP;

    -- 创建订单
    INSERT INTO orders (order_no, user_id, total_amount, shipping_address)
    VALUES (p_order_no, p_user_id, v_total, p_shipping_address)
    RETURNING id INTO p_order_id;

    -- 创建订单明细并扣减库存
    FOR v_cart_item IN 
        SELECT ci.product_id, ci.quantity, p.price, p.name
        FROM cart_items ci
        JOIN products p ON ci.product_id = p.id
        WHERE ci.user_id = p_user_id
    LOOP
        v_subtotal := v_cart_item.price * v_cart_item.quantity;

        INSERT INTO order_items (order_id, product_id, product_name, price, quantity, subtotal)
        VALUES (p_order_id, v_cart_item.product_id, v_cart_item.name, v_cart_item.price, v_cart_item.quantity, v_subtotal);

        -- 扣减库存
        PERFORM deduct_inventory(v_cart_item.product_id, v_cart_item.quantity, p_order_id);
    END LOOP;

    -- 清空购物车
    DELETE FROM cart_items WHERE user_id = p_user_id;

    COMMIT;
END;
$$;

取消订单

CREATE OR REPLACE PROCEDURE cancel_order(p_order_id INTEGER)
LANGUAGE plpgsql
AS $$
DECLARE
    v_item RECORD;
BEGIN
    -- 检查订单状态
    IF (SELECT status FROM orders WHERE id = p_order_id) NOT IN ('pending', 'paid') THEN
        RAISE EXCEPTION '订单状态不允许取消';
    END IF;

    -- 恢复库存
    FOR v_item IN SELECT * FROM order_items WHERE order_id = p_order_id
    LOOP
        PERFORM add_inventory(v_item.product_id, v_item.quantity, 'return', '订单取消: ' || p_order_id);
    END LOOP;

    -- 更新订单状态
    UPDATE orders SET status = 'cancelled' WHERE id = p_order_id;

    -- 如果已支付,创建退款记录
    IF EXISTS (SELECT 1 FROM payments WHERE order_id = p_order_id AND status = 'success') THEN
        UPDATE payments SET status = 'refunded' WHERE order_id = p_order_id AND status = 'success';
    END IF;

    COMMIT;
END;
$$;

10.6 Python 应用集成

from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from typing import Optional, List
import asyncpg
from datetime import datetime

app = FastAPI(title="电商订单系统")

# 数据库连接池
async def get_db():
    pool = await asyncpg.create_pool(
        "postgresql://postgres:password@localhost/ecommerce",
        min_size=5, max_size=20
    )
    return pool

# 模型
class ProductCreate(BaseModel):
    name: str
    description: Optional[str] = None
    price: float
    cost_price: Optional[float] = None
    stock: int = 0
    category_id: Optional[int] = None
    sku: Optional[str] = None

class OrderCreate(BaseModel):
    user_id: int
    shipping_address: str

# API 路由
@app.post("/products")
async def create_product(product: ProductCreate, db = Depends(get_db)):
    async with db.acquire() as conn:
        product_id = await conn.fetchval(
            """INSERT INTO products (name, description, price, cost_price, stock, category_id, sku)
               VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING id""",
            product.name, product.description, product.price,
            product.cost_price, product.stock, product.category_id, product.sku
        )
    return {"id": product_id, "message": "商品创建成功"}

@app.get("/products/{product_id}")
async def get_product(product_id: int, db = Depends(get_db)):
    async with db.acquire() as conn:
        product = await conn.fetchrow(
            "SELECT * FROM product_details WHERE id = $1", product_id
        )
        if not product:
            raise HTTPException(status_code=404, detail="商品不存在")
        return dict(product)

@app.post("/orders")
async def create_order(order: OrderCreate, db = Depends(get_db)):
    async with db.acquire() as conn:
        try:
            result = await conn.callproc(
                'create_order',
                [order.user_id, order.shipping_address]
            )
            return {
                "order_id": result['p_order_id'],
                "order_no": result['p_order_no'],
                "message": "订单创建成功"
            }
        except Exception as e:
            raise HTTPException(status_code=400, detail=str(e))

@app.get("/orders/{order_id}")
async def get_order(order_id: int, db = Depends(get_db)):
    async with db.acquire() as conn:
        order = await conn.fetchrow(
            "SELECT * FROM order_details WHERE id = $1", order_id
        )
        if not order:
            raise HTTPException(status_code=404, detail="订单不存在")
        return dict(order)

@app.get("/stats/sales")
async def get_sales_stats(
    start_date: Optional[str] = None,
    end_date: Optional[str] = None,
    db = Depends(get_db)
):
    async with db.acquire() as conn:
        query = "SELECT * FROM sales_stats WHERE 1=1"
        params = []
        if start_date:
            query += f" AND sale_date >= ${len(params)+1}"
            params.append(start_date)
        if end_date:
            query += f" AND sale_date <= ${len(params)+1}"
            params.append(end_date)

        rows = await conn.fetch(query, *params)
        return [dict(row) for row in rows]

10.7 性能优化

索引优化

-- 分析慢查询
SELECT query, calls, total_time, mean_time
FROM pg_stat_statements
ORDER BY total_time DESC
LIMIT 10;

-- 为常用查询创建索引
CREATE INDEX CONCURRENTLY idx_orders_user_status ON orders(user_id, status);
CREATE INDEX CONCURRENTLY idx_products_active ON products(is_active) WHERE is_active = true;

-- 使用覆盖索引
CREATE INDEX idx_order_items_covering ON order_items(order_id) INCLUDE (product_name, price, quantity, subtotal);

分区表

-- 按月分区订单表
CREATE TABLE orders_partitioned (
    LIKE orders INCLUDING ALL
) PARTITION BY RANGE (created_at);

CREATE TABLE orders_2024_01 PARTITION OF orders_partitioned
    FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');

CREATE TABLE orders_2024_02 PARTITION OF orders_partitioned
    FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');

-- 自动创建分区(使用 pg_partman 扩展)

10.8 小结

本章通过电商订单系统实战,综合应用了:

  1. 数据库设计:规范化设计、索引策略
  2. 触发器:自动更新时间戳、数据校验
  3. 存储过程:业务逻辑封装、事务控制
  4. 视图:简化查询、数据抽象
  5. Python 集成:FastAPI + asyncpg
  6. 性能优化:索引、分区

至此,PostgreSQL 教程全部完成。你已经掌握了从基础到高级的 PostgreSQL 技能,可以应用于实际项目开发中。