第十章:实战项目¶
本章将通过一个完整的电商订单系统,综合应用 PostgreSQL 的各项特性。
10.1 项目需求¶
构建一个电商订单系统,需要实现:
- 用户管理(注册、登录、权限)
- 商品管理(分类、库存、价格)
- 购物车功能
- 订单管理(创建、支付、发货)
- 数据统计与分析
10.2 数据库设计¶
ER 图¶
users ──┬── orders ──── order_items ──── products
│ │
│ └── payments
│
└── cart_items ──── products
products ──── categories
└── inventory_logs
表结构¶
-- 用户表
CREATE TABLE users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(255) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
role VARCHAR(20) DEFAULT 'customer' CHECK (role IN ('customer', 'admin', 'staff')),
is_active BOOLEAN DEFAULT true,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 商品分类表
CREATE TABLE categories (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
parent_id INTEGER REFERENCES categories(id),
sort_order INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 商品表
CREATE TABLE products (
id SERIAL PRIMARY KEY,
name VARCHAR(200) NOT NULL,
description TEXT,
price DECIMAL(10, 2) NOT NULL CHECK (price >= 0),
cost_price DECIMAL(10, 2) CHECK (cost_price >= 0),
stock INTEGER DEFAULT 0 CHECK (stock >= 0),
category_id INTEGER REFERENCES categories(id),
sku VARCHAR(50) UNIQUE,
is_active BOOLEAN DEFAULT true,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 购物车表
CREATE TABLE cart_items (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
product_id INTEGER NOT NULL REFERENCES products(id),
quantity INTEGER NOT NULL DEFAULT 1 CHECK (quantity > 0),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE (user_id, product_id)
);
-- 订单表
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
order_no VARCHAR(50) UNIQUE NOT NULL,
user_id INTEGER NOT NULL REFERENCES users(id),
status VARCHAR(20) DEFAULT 'pending' CHECK (status IN ('pending', 'paid', 'shipped', 'delivered', 'cancelled')),
total_amount DECIMAL(12, 2) NOT NULL,
shipping_address TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 订单明细表
CREATE TABLE order_items (
id SERIAL PRIMARY KEY,
order_id INTEGER NOT NULL REFERENCES orders(id) ON DELETE CASCADE,
product_id INTEGER NOT NULL REFERENCES products(id),
product_name VARCHAR(200) NOT NULL,
price DECIMAL(10, 2) NOT NULL,
quantity INTEGER NOT NULL CHECK (quantity > 0),
subtotal DECIMAL(12, 2) NOT NULL
);
-- 支付记录表
CREATE TABLE payments (
id SERIAL PRIMARY KEY,
order_id INTEGER NOT NULL REFERENCES orders(id),
amount DECIMAL(12, 2) NOT NULL,
payment_method VARCHAR(20) NOT NULL,
status VARCHAR(20) DEFAULT 'pending' CHECK (status IN ('pending', 'success', 'failed', 'refunded')),
transaction_id VARCHAR(100),
paid_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 库存变更日志
CREATE TABLE inventory_logs (
id SERIAL PRIMARY KEY,
product_id INTEGER NOT NULL REFERENCES products(id),
change_type VARCHAR(20) NOT NULL CHECK (change_type IN ('purchase', 'sale', 'adjustment', 'return')),
quantity INTEGER NOT NULL,
before_stock INTEGER NOT NULL,
after_stock INTEGER NOT NULL,
reference_id INTEGER,
remark TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 创建索引
CREATE INDEX idx_products_category ON products(category_id);
CREATE INDEX idx_products_sku ON products(sku);
CREATE INDEX idx_orders_user ON orders(user_id);
CREATE INDEX idx_orders_status ON orders(status);
CREATE INDEX idx_orders_created ON orders(created_at);
CREATE INDEX idx_order_items_order ON order_items(order_id);
CREATE INDEX idx_payments_order ON payments(order_id);
10.3 触发器与函数¶
自动更新 updated_at¶
CREATE OR REPLACE FUNCTION update_updated_at()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = CURRENT_TIMESTAMP;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER users_updated_at
BEFORE UPDATE ON users
FOR EACH ROW EXECUTE FUNCTION update_updated_at();
CREATE TRIGGER products_updated_at
BEFORE UPDATE ON products
FOR EACH ROW EXECUTE FUNCTION update_updated_at();
CREATE TRIGGER orders_updated_at
BEFORE UPDATE ON orders
FOR EACH ROW EXECUTE FUNCTION update_updated_at();
库存管理¶
-- 扣减库存
CREATE OR REPLACE FUNCTION deduct_inventory(
p_product_id INTEGER,
p_quantity INTEGER,
p_order_id INTEGER
) RETURNS BOOLEAN AS $$
DECLARE
v_current_stock INTEGER;
BEGIN
-- 获取当前库存并锁定
SELECT stock INTO v_current_stock
FROM products WHERE id = p_product_id
FOR UPDATE;
-- 检查库存
IF v_current_stock < p_quantity THEN
RAISE EXCEPTION '库存不足: 产品 % 当前库存 %,需要 %',
p_product_id, v_current_stock, p_quantity;
END IF;
-- 扣减库存
UPDATE products
SET stock = stock - p_quantity
WHERE id = p_product_id;
-- 记录日志
INSERT INTO inventory_logs (product_id, change_type, quantity, before_stock, after_stock, reference_id)
VALUES (p_product_id, 'sale', -p_quantity, v_current_stock, v_current_stock - p_quantity, p_order_id);
RETURN true;
END;
$$ LANGUAGE plpgsql;
-- 增加库存
CREATE OR REPLACE FUNCTION add_inventory(
p_product_id INTEGER,
p_quantity INTEGER,
p_change_type VARCHAR DEFAULT 'purchase',
p_remark TEXT DEFAULT NULL
) RETURNS BOOLEAN AS $$
DECLARE
v_current_stock INTEGER;
BEGIN
SELECT stock INTO v_current_stock
FROM products WHERE id = p_product_id
FOR UPDATE;
UPDATE products
SET stock = stock + p_quantity
WHERE id = p_product_id;
INSERT INTO inventory_logs (product_id, change_type, quantity, before_stock, after_stock, remark)
VALUES (p_product_id, p_change_type, p_quantity, v_current_stock, v_current_stock + p_quantity, p_remark);
RETURN true;
END;
$$ LANGUAGE plpgsql;
订单号生成¶
CREATE OR REPLACE FUNCTION generate_order_no()
RETURNS VARCHAR AS $$
DECLARE
v_date VARCHAR := to_char(CURRENT_DATE, 'YYYYMMDD');
v_seq INTEGER;
v_order_no VARCHAR;
BEGIN
-- 获取当日序列号
SELECT COALESCE(MAX(CAST(SUBSTRING(order_no FROM 9 FOR 6) AS INTEGER)), 0) + 1
INTO v_seq
FROM orders
WHERE order_no LIKE v_date || '%';
v_order_no := v_date || LPAD(v_seq::TEXT, 6, '0');
RETURN v_order_no;
END;
$$ LANGUAGE plpgsql;
10.4 视图¶
-- 商品详情视图
CREATE VIEW product_details AS
SELECT
p.*,
c.name AS category_name,
(SELECT COUNT(*) FROM order_items WHERE product_id = p.id) AS sales_count
FROM products p
LEFT JOIN categories c ON p.category_id = c.id;
-- 订单详情视图
CREATE VIEW order_details AS
SELECT
o.*,
u.username,
u.email,
(SELECT JSON_AGG(
JSON_BUILD_OBJECT(
'product_name', oi.product_name,
'price', oi.price,
'quantity', oi.quantity,
'subtotal', oi.subtotal
)
) FROM order_items oi WHERE oi.order_id = o.id) AS items
FROM orders o
JOIN users u ON o.user_id = u.id;
-- 销售统计视图
CREATE VIEW sales_stats AS
SELECT
DATE(created_at) AS sale_date,
COUNT(*) AS order_count,
SUM(total_amount) AS total_sales,
AVG(total_amount) AS avg_order_amount
FROM orders
WHERE status != 'cancelled'
GROUP BY DATE(created_at);
10.5 存储过程¶
创建订单¶
CREATE OR REPLACE PROCEDURE create_order(
p_user_id INTEGER,
p_shipping_address TEXT,
OUT p_order_id INTEGER,
OUT p_order_no VARCHAR
)
LANGUAGE plpgsql
AS $$
DECLARE
v_cart_item RECORD;
v_total DECIMAL(12, 2) := 0;
v_subtotal DECIMAL(12, 2);
BEGIN
-- 生成订单号
p_order_no := generate_order_no();
-- 计算总价
FOR v_cart_item IN
SELECT ci.product_id, ci.quantity, p.price, p.name
FROM cart_items ci
JOIN products p ON ci.product_id = p.id
WHERE ci.user_id = p_user_id AND p.is_active = true
LOOP
-- 检查库存
IF (SELECT stock FROM products WHERE id = v_cart_item.product_id) < v_cart_item.quantity THEN
RAISE EXCEPTION '商品 % 库存不足', v_cart_item.name;
END IF;
v_total := v_total + v_cart_item.price * v_cart_item.quantity;
END LOOP;
-- 创建订单
INSERT INTO orders (order_no, user_id, total_amount, shipping_address)
VALUES (p_order_no, p_user_id, v_total, p_shipping_address)
RETURNING id INTO p_order_id;
-- 创建订单明细并扣减库存
FOR v_cart_item IN
SELECT ci.product_id, ci.quantity, p.price, p.name
FROM cart_items ci
JOIN products p ON ci.product_id = p.id
WHERE ci.user_id = p_user_id
LOOP
v_subtotal := v_cart_item.price * v_cart_item.quantity;
INSERT INTO order_items (order_id, product_id, product_name, price, quantity, subtotal)
VALUES (p_order_id, v_cart_item.product_id, v_cart_item.name, v_cart_item.price, v_cart_item.quantity, v_subtotal);
-- 扣减库存
PERFORM deduct_inventory(v_cart_item.product_id, v_cart_item.quantity, p_order_id);
END LOOP;
-- 清空购物车
DELETE FROM cart_items WHERE user_id = p_user_id;
COMMIT;
END;
$$;
取消订单¶
CREATE OR REPLACE PROCEDURE cancel_order(p_order_id INTEGER)
LANGUAGE plpgsql
AS $$
DECLARE
v_item RECORD;
BEGIN
-- 检查订单状态
IF (SELECT status FROM orders WHERE id = p_order_id) NOT IN ('pending', 'paid') THEN
RAISE EXCEPTION '订单状态不允许取消';
END IF;
-- 恢复库存
FOR v_item IN SELECT * FROM order_items WHERE order_id = p_order_id
LOOP
PERFORM add_inventory(v_item.product_id, v_item.quantity, 'return', '订单取消: ' || p_order_id);
END LOOP;
-- 更新订单状态
UPDATE orders SET status = 'cancelled' WHERE id = p_order_id;
-- 如果已支付,创建退款记录
IF EXISTS (SELECT 1 FROM payments WHERE order_id = p_order_id AND status = 'success') THEN
UPDATE payments SET status = 'refunded' WHERE order_id = p_order_id AND status = 'success';
END IF;
COMMIT;
END;
$$;
10.6 Python 应用集成¶
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from typing import Optional, List
import asyncpg
from datetime import datetime
app = FastAPI(title="电商订单系统")
# 数据库连接池
async def get_db():
pool = await asyncpg.create_pool(
"postgresql://postgres:password@localhost/ecommerce",
min_size=5, max_size=20
)
return pool
# 模型
class ProductCreate(BaseModel):
name: str
description: Optional[str] = None
price: float
cost_price: Optional[float] = None
stock: int = 0
category_id: Optional[int] = None
sku: Optional[str] = None
class OrderCreate(BaseModel):
user_id: int
shipping_address: str
# API 路由
@app.post("/products")
async def create_product(product: ProductCreate, db = Depends(get_db)):
async with db.acquire() as conn:
product_id = await conn.fetchval(
"""INSERT INTO products (name, description, price, cost_price, stock, category_id, sku)
VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING id""",
product.name, product.description, product.price,
product.cost_price, product.stock, product.category_id, product.sku
)
return {"id": product_id, "message": "商品创建成功"}
@app.get("/products/{product_id}")
async def get_product(product_id: int, db = Depends(get_db)):
async with db.acquire() as conn:
product = await conn.fetchrow(
"SELECT * FROM product_details WHERE id = $1", product_id
)
if not product:
raise HTTPException(status_code=404, detail="商品不存在")
return dict(product)
@app.post("/orders")
async def create_order(order: OrderCreate, db = Depends(get_db)):
async with db.acquire() as conn:
try:
result = await conn.callproc(
'create_order',
[order.user_id, order.shipping_address]
)
return {
"order_id": result['p_order_id'],
"order_no": result['p_order_no'],
"message": "订单创建成功"
}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/orders/{order_id}")
async def get_order(order_id: int, db = Depends(get_db)):
async with db.acquire() as conn:
order = await conn.fetchrow(
"SELECT * FROM order_details WHERE id = $1", order_id
)
if not order:
raise HTTPException(status_code=404, detail="订单不存在")
return dict(order)
@app.get("/stats/sales")
async def get_sales_stats(
start_date: Optional[str] = None,
end_date: Optional[str] = None,
db = Depends(get_db)
):
async with db.acquire() as conn:
query = "SELECT * FROM sales_stats WHERE 1=1"
params = []
if start_date:
query += f" AND sale_date >= ${len(params)+1}"
params.append(start_date)
if end_date:
query += f" AND sale_date <= ${len(params)+1}"
params.append(end_date)
rows = await conn.fetch(query, *params)
return [dict(row) for row in rows]
10.7 性能优化¶
索引优化¶
-- 分析慢查询
SELECT query, calls, total_time, mean_time
FROM pg_stat_statements
ORDER BY total_time DESC
LIMIT 10;
-- 为常用查询创建索引
CREATE INDEX CONCURRENTLY idx_orders_user_status ON orders(user_id, status);
CREATE INDEX CONCURRENTLY idx_products_active ON products(is_active) WHERE is_active = true;
-- 使用覆盖索引
CREATE INDEX idx_order_items_covering ON order_items(order_id) INCLUDE (product_name, price, quantity, subtotal);
分区表¶
-- 按月分区订单表
CREATE TABLE orders_partitioned (
LIKE orders INCLUDING ALL
) PARTITION BY RANGE (created_at);
CREATE TABLE orders_2024_01 PARTITION OF orders_partitioned
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
CREATE TABLE orders_2024_02 PARTITION OF orders_partitioned
FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');
-- 自动创建分区(使用 pg_partman 扩展)
10.8 小结¶
本章通过电商订单系统实战,综合应用了:
- 数据库设计:规范化设计、索引策略
- 触发器:自动更新时间戳、数据校验
- 存储过程:业务逻辑封装、事务控制
- 视图:简化查询、数据抽象
- Python 集成:FastAPI + asyncpg
- 性能优化:索引、分区
至此,PostgreSQL 教程全部完成。你已经掌握了从基础到高级的 PostgreSQL 技能,可以应用于实际项目开发中。