跳转至

第六章:高级特性

PostgreSQL 提供了许多高级特性,本章将介绍事务隔离级别、锁机制、存储过程、触发器、视图等核心功能。

6.1 事务与隔离级别

事务基础

-- 开始事务
BEGIN;

-- 或
START TRANSACTION;

-- 提交事务
COMMIT;

-- 回滚事务
ROLLBACK;

-- 保存点
BEGIN;
INSERT INTO orders (customer_id, total) VALUES (1, 100);
SAVEPOINT order_created;

INSERT INTO order_items (order_id, product_id, quantity) VALUES (1, 1, 5);
-- 如果出错,可以回滚到保存点
ROLLBACK TO order_created;

COMMIT;

隔离级别

PostgreSQL 支持四种隔离级别:

-- 查看当前隔离级别
SHOW default_transaction_isolation;

-- 设置隔离级别
SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ COMMITTED;

-- 单个事务设置
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;

隔离级别对比:

隔离级别 脏读 不可重复读 幻读
READ UNCOMMITTED 可能 可能 可能
READ COMMITTED 不会 可能 可能
REPEATABLE READ 不会 不会 可能*
SERIALIZABLE 不会 不会 不会

*PostgreSQL 的 REPEATABLE READ 通过快照隔离实际上也防止了幻读。

READ COMMITTED(默认)

-- 会话 A
BEGIN;
SELECT * FROM orders WHERE id = 1;  -- total = 100

-- 会话 B(同时执行)
UPDATE orders SET total = 200 WHERE id = 1;
COMMIT;

-- 会话 A 继续查询
SELECT * FROM orders WHERE id = 1;  -- total = 200(看到了其他事务的提交)
COMMIT;

REPEATABLE READ

-- 会话 A
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
SELECT * FROM orders WHERE id = 1;  -- total = 100

-- 会话 B(同时执行)
UPDATE orders SET total = 200 WHERE id = 1;
COMMIT;

-- 会话 A 继续查询
SELECT * FROM orders WHERE id = 1;  -- total = 100(仍然是旧值)
COMMIT;

-- 再次查询才能看到新值
SELECT * FROM orders WHERE id = 1;  -- total = 200

SERIALIZABLE

-- 最高隔离级别,完全串行化执行
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;

-- 可能出现序列化失败
-- ERROR: could not serialize access due to concurrent update

-- 需要重试机制

6.2 锁机制

表级锁

-- ACCESS SHARE:SELECT 语句自动获取
SELECT * FROM orders;

-- ROW SHARE:SELECT FOR UPDATE/SHARE 自动获取
SELECT * FROM orders WHERE id = 1 FOR UPDATE;

-- ROW EXCLUSIVE:INSERT/UPDATE/DELETE 自动获取
UPDATE orders SET total = 200 WHERE id = 1;

-- SHARE:阻止写入,允许读取
LOCK TABLE orders IN SHARE MODE;

-- EXCLUSIVE:阻止读写
LOCK TABLE orders IN EXCLUSIVE MODE;

-- ACCESS EXCLUSIVE:完全锁表(DDL 操作)
LOCK TABLE orders IN ACCESS EXCLUSIVE MODE;
ALTER TABLE orders ADD COLUMN notes text;

行级锁

-- FOR UPDATE:排他行锁
BEGIN;
SELECT * FROM orders WHERE id = 1 FOR UPDATE;
-- 其他事务无法修改这行
UPDATE orders SET total = 200 WHERE id = 1;
COMMIT;

-- FOR SHARE:共享行锁
BEGIN;
SELECT * FROM orders WHERE id = 1 FOR SHARE;
-- 其他事务可以读取,但不能修改
COMMIT;

-- FOR NO KEY UPDATE:不锁定外键引用
SELECT * FROM orders WHERE id = 1 FOR NO KEY UPDATE;

-- FOR KEY SHARE:允许 FOR NO KEY UPDATE
SELECT * FROM orders WHERE id = 1 FOR KEY SHARE;

-- SKIP LOCKED:跳过已锁定的行(队列场景)
SELECT * FROM tasks WHERE status = 'pending' FOR UPDATE SKIP LOCKED LIMIT 1;

-- NOWAIT:如果无法立即获取锁则报错
SELECT * FROM orders WHERE id = 1 FOR UPDATE NOWAIT;

死锁检测

-- 死锁示例
-- 会话 A
BEGIN;
UPDATE orders SET total = 100 WHERE id = 1;

-- 会话 B
BEGIN;
UPDATE orders SET total = 200 WHERE id = 2;

-- 会话 A
UPDATE orders SET total = 300 WHERE id = 2;  -- 等待

-- 会话 B
UPDATE orders SET total = 400 WHERE id = 1;  -- 死锁!
-- ERROR: deadlock detected

-- 查看锁等待
SELECT 
    blocked_locks.pid AS blocked_pid,
    blocked_activity.usename AS blocked_user,
    blocking_locks.pid AS blocking_pid,
    blocking_activity.usename AS blocking_user,
    blocked_activity.query AS blocked_statement,
    blocking_activity.query AS blocking_statement
FROM pg_catalog.pg_locks blocked_locks
JOIN pg_catalog.pg_stat_activity blocked_activity ON blocked_activity.pid = blocked_locks.pid
JOIN pg_catalog.pg_locks blocking_locks ON blocking_locks.locktype = blocked_locks.locktype
    AND blocking_locks.relation = blocked_locks.relation
    AND blocking_locks.pid != blocked_locks.pid
JOIN pg_catalog.pg_stat_activity blocking_activity ON blocking_activity.pid = blocking_locks.pid
WHERE NOT blocked_locks.granted;

咨询锁

-- 应用级别的锁,不自动释放
-- 获取锁
SELECT pg_advisory_lock(12345);

-- 尝试获取锁(非阻塞)
SELECT pg_try_advisory_lock(12345);  -- 返回 true/false

-- 释放锁
SELECT pg_advisory_unlock(12345);

-- 会话级别锁(会话结束自动释放)
SELECT pg_advisory_lock_shared(12345);

-- 事务级别锁(事务结束自动释放)
SELECT pg_advisory_xact_lock(12345);

6.3 存储过程与函数

基本函数

-- 简单函数
CREATE OR REPLACE FUNCTION add_numbers(a integer, b integer)
RETURNS integer AS $$
BEGIN
    RETURN a + b;
END;
$$ LANGUAGE plpgsql;

-- 调用函数
SELECT add_numbers(10, 20);

-- 删除函数
DROP FUNCTION IF EXISTS add_numbers(integer, integer);

带参数的函数

-- 带默认参数
CREATE OR REPLACE FUNCTION greet(
    name text,
    greeting text DEFAULT 'Hello'
) RETURNS text AS $$
BEGIN
    RETURN greeting || ', ' || name || '!';
END;
$$ LANGUAGE plpgsql;

SELECT greet('World');           -- Hello, World!
SELECT greet('World', 'Hi');     -- Hi, World!
SELECT greet(name := 'World');   -- Hello, World!

-- OUT 参数
CREATE OR REPLACE FUNCTION get_order_stats(
    p_customer_id integer,
    OUT total_orders bigint,
    OUT total_amount numeric
) AS $$
BEGIN
    SELECT COUNT(*), COALESCE(SUM(total), 0)
    INTO total_orders, total_amount
    FROM orders
    WHERE customer_id = p_customer_id;
END;
$$ LANGUAGE plpgsql;

SELECT * FROM get_order_stats(1);

返回集合

-- 返回多行
CREATE OR REPLACE FUNCTION get_customer_orders(p_customer_id integer)
RETURNS TABLE (
    order_id integer,
    order_date date,
    total numeric
) AS $$
BEGIN
    RETURN QUERY
    SELECT id, order_date, total
    FROM orders
    WHERE customer_id = p_customer_id
    ORDER BY order_date DESC;
END;
$$ LANGUAGE plpgsql;

SELECT * FROM get_customer_orders(1);

-- 返回 SETOF
CREATE OR REPLACE FUNCTION get_top_products(limit_count integer DEFAULT 10)
RETURNS SETOF products AS $$
BEGIN
    RETURN QUERY
    SELECT p.*
    FROM products p
    JOIN order_items oi ON p.id = oi.product_id
    GROUP BY p.id
    ORDER BY SUM(oi.quantity) DESC
    LIMIT limit_count;
END;
$$ LANGUAGE plpgsql;

流程控制

CREATE OR REPLACE FUNCTION calculate_discount(
    p_total numeric,
    p_customer_type text
) RETURNS numeric AS $$
DECLARE
    v_discount numeric;
BEGIN
    -- IF-THEN-ELSE
    IF p_customer_type = 'vip' THEN
        v_discount := 0.2;
    ELSIF p_customer_type = 'regular' THEN
        v_discount := 0.1;
    ELSE
        v_discount := 0;
    END IF;

    -- CASE 语句
    v_discount := CASE 
        WHEN p_total > 10000 THEN v_discount + 0.05
        WHEN p_total > 5000 THEN v_discount + 0.02
        ELSE v_discount
    END;

    RETURN p_total * v_discount;
END;
$$ LANGUAGE plpgsql;

-- 循环
CREATE OR REPLACE FUNCTION generate_series_custom(
    p_start integer,
    p_end integer
) RETURNS SETOF integer AS $$
DECLARE
    i integer;
BEGIN
    i := p_start;
    LOOP
        EXIT WHEN i > p_end;
        RETURN NEXT i;
        i := i + 1;
    END LOOP;
    RETURN;
END;
$$ LANGUAGE plpgsql;

-- FOR 循环
CREATE OR REPLACE FUNCTION process_orders()
RETURNS void AS $$
DECLARE
    order_rec RECORD;
BEGIN
    FOR order_rec IN 
        SELECT * FROM orders WHERE status = 'pending'
    LOOP
        -- 处理每个订单
        UPDATE orders SET status = 'processing' WHERE id = order_rec.id;
        -- 其他逻辑...
    END LOOP;
END;
$$ LANGUAGE plpgsql;

-- WHILE 循环
CREATE OR REPLACE FUNCTION while_example()
RETURNS integer AS $$
DECLARE
    i integer := 0;
    sum integer := 0;
BEGIN
    WHILE i < 10 LOOP
        sum := sum + i;
        i := i + 1;
    END LOOP;
    RETURN sum;
END;
$$ LANGUAGE plpgsql;

异常处理

CREATE OR REPLACE FUNCTION safe_divide(
    a numeric,
    b numeric
) RETURNS numeric AS $$
DECLARE
    result numeric;
BEGIN
    BEGIN
        result := a / b;
    EXCEPTION
        WHEN division_by_zero THEN
            RAISE NOTICE '除数不能为零,返回 NULL';
            RETURN NULL;
        WHEN others THEN
            RAISE NOTICE '发生错误: %', SQLERRM;
            RETURN NULL;
    END;

    RETURN result;
END;
$$ LANGUAGE plpgsql;

-- 自定义异常
CREATE OR REPLACE FUNCTION validate_order(
    p_customer_id integer,
    p_total numeric
) RETURNS void AS $$
BEGIN
    IF p_total <= 0 THEN
        RAISE EXCEPTION '订单金额必须大于零';
    END IF;

    IF NOT EXISTS (SELECT 1 FROM customers WHERE id = p_customer_id) THEN
        RAISE EXCEPTION '客户 % 不存在', p_customer_id
            USING HINT = '请检查客户ID是否正确',
                  ERRCODE = '23503';
    END IF;
END;
$$ LANGUAGE plpgsql;

6.4 存储过程(PROCEDURE)

PostgreSQL 11+ 支持存储过程,可以包含事务控制:

-- 创建存储过程
CREATE OR REPLACE PROCEDURE transfer_money(
    p_from_account integer,
    p_to_account integer,
    p_amount numeric
) AS $$
BEGIN
    -- 检查余额
    IF (SELECT balance FROM accounts WHERE id = p_from_account) < p_amount THEN
        RAISE EXCEPTION '余额不足';
    END IF;

    -- 扣款
    UPDATE accounts SET balance = balance - p_amount WHERE id = p_from_account;

    -- 入账
    UPDATE accounts SET balance = balance + p_amount WHERE id = p_to_account;

    -- 可以在存储过程中控制事务
    COMMIT;
END;
$$ LANGUAGE plpgsql;

-- 调用存储过程
CALL transfer_money(1, 2, 1000);

-- 带输出参数的存储过程
CREATE OR REPLACE PROCEDURE get_customer_info(
    p_customer_id integer,
    OUT p_name text,
    OUT p_email text
) AS $$
BEGIN
    SELECT name, email INTO p_name, p_email
    FROM customers WHERE id = p_customer_id;
END;
$$ LANGUAGE plpgsql;

CALL get_customer_info(1, NULL, NULL);

6.5 触发器

基本触发器

-- 创建触发器函数
CREATE OR REPLACE FUNCTION update_updated_at()
RETURNS TRIGGER AS $$
BEGIN
    NEW.updated_at = CURRENT_TIMESTAMP;
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

-- 创建触发器
CREATE TRIGGER orders_updated_at
    BEFORE UPDATE ON orders
    FOR EACH ROW
    EXECUTE FUNCTION update_updated_at();

-- 测试
UPDATE orders SET total = 500 WHERE id = 1;
-- updated_at 会自动更新

触发器类型

-- BEFORE 触发器:在操作前执行
CREATE TRIGGER validate_order_before_insert
    BEFORE INSERT ON orders
    FOR EACH ROW
    EXECUTE FUNCTION validate_order_func();

-- AFTER 触发器:在操作后执行
CREATE TRIGGER log_order_changes
    AFTER INSERT OR UPDATE OR DELETE ON orders
    FOR EACH ROW
    EXECUTE FUNCTION log_order_func();

-- INSTEAD OF 触发器:用于视图
CREATE TRIGGER instead_of_insert
    INSTEAD OF INSERT ON order_view
    FOR EACH ROW
    EXECUTE FUNCTION insert_order_func();

-- 语句级触发器(每条 SQL 执行一次)
CREATE TRIGGER audit_orders
    AFTER INSERT OR UPDATE OR DELETE ON orders
    FOR EACH STATEMENT
    EXECUTE FUNCTION audit_orders_func();

触发器变量

CREATE OR REPLACE FUNCTION audit_changes()
RETURNS TRIGGER AS $$
BEGIN
    IF TG_OP = 'INSERT' THEN
        INSERT INTO audit_log (table_name, operation, new_data, changed_at)
        VALUES (TG_TABLE_NAME, 'INSERT', row_to_json(NEW), NOW());
        RETURN NEW;

    ELSIF TG_OP = 'UPDATE' THEN
        INSERT INTO audit_log (table_name, operation, old_data, new_data, changed_at)
        VALUES (TG_TABLE_NAME, 'UPDATE', row_to_json(OLD), row_to_json(NEW), NOW());
        RETURN NEW;

    ELSIF TG_OP = 'DELETE' THEN
        INSERT INTO audit_log (table_name, operation, old_data, changed_at)
        VALUES (TG_TABLE_NAME, 'DELETE', row_to_json(OLD), NOW());
        RETURN OLD;
    END IF;

    RETURN NULL;
END;
$$ LANGUAGE plpgsql;

/*
触发器变量:
- NEW: 新行数据(INSERT/UPDATE)
- OLD: 旧行数据(UPDATE/DELETE)
- TG_OP: 操作类型(INSERT/UPDATE/DELETE/TRUNCATE)
- TG_TABLE_NAME: 表名
- TG_WHEN: 触发时机(BEFORE/AFTER/INSTEAD OF)
- TG_LEVEL: 触发级别(ROW/STATEMENT)
*/

条件触发器

-- 只在特定条件下触发
CREATE TRIGGER check_order_status
    BEFORE UPDATE OF status ON orders
    FOR EACH ROW
    WHEN (OLD.status IS DISTINCT FROM NEW.status)
    EXECUTE FUNCTION handle_status_change();

管理触发器

-- 查看触发器
SELECT * FROM pg_trigger WHERE tgname = 'orders_updated_at';

-- 禁用触发器
ALTER TABLE orders DISABLE TRIGGER orders_updated_at;

-- 启用触发器
ALTER TABLE orders ENABLE TRIGGER orders_updated_at;

-- 删除触发器
DROP TRIGGER IF EXISTS orders_updated_at ON orders;

6.6 视图

基本视图

-- 创建视图
CREATE VIEW customer_orders_view AS
SELECT 
    c.id AS customer_id,
    c.name AS customer_name,
    o.id AS order_id,
    o.order_date,
    o.total
FROM customers c
LEFT JOIN orders o ON c.id = o.customer_id;

-- 使用视图
SELECT * FROM customer_orders_view WHERE customer_id = 1;

-- 修改视图
CREATE OR REPLACE VIEW customer_orders_view AS
SELECT 
    c.id AS customer_id,
    c.name AS customer_name,
    c.email,
    o.id AS order_id,
    o.order_date,
    o.total
FROM customers c
LEFT JOIN orders o ON c.id = o.customer_id;

-- 删除视图
DROP VIEW IF EXISTS customer_orders_view;

可更新视图

-- 简单视图可以自动更新
CREATE VIEW active_customers AS
SELECT id, name, email FROM customers WHERE is_active = true;

-- 可以直接插入
INSERT INTO active_customers (name, email) VALUES ('Test', 'test@example.com');

-- 复杂视图需要规则或触发器
CREATE VIEW order_summary AS
SELECT 
    customer_id,
    COUNT(*) AS order_count,
    SUM(total) AS total_amount
FROM orders
GROUP BY customer_id;

-- 无法直接更新,需要 INSTEAD OF 触发器
CREATE FUNCTION update_order_summary()
RETURNS TRIGGER AS $$
BEGIN
    -- 实现更新逻辑
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER update_order_summary_trigger
    INSTEAD OF UPDATE ON order_summary
    FOR EACH ROW
    EXECUTE FUNCTION update_order_summary();

物化视图

-- 创建物化视图(存储查询结果)
CREATE MATERIALIZED VIEW sales_summary AS
SELECT 
    DATE_TRUNC('month', order_date) AS month,
    COUNT(*) AS order_count,
    SUM(total) AS total_sales
FROM orders
GROUP BY DATE_TRUNC('month', order_date);

-- 刷新物化视图
REFRESH MATERIALIZED VIEW sales_summary;

-- 并发刷新(不锁表)
REFRESH MATERIALIZED VIEW CONCURRENTLY sales_summary;

-- 创建索引
CREATE INDEX idx_sales_summary_month ON sales_summary(month);

-- 删除
DROP MATERIALIZED VIEW IF EXISTS sales_summary;

6.7 规则系统

-- 创建规则(重写查询)
CREATE RULE log_delete AS
    ON DELETE TO orders
    DO ALSO
        INSERT INTO orders_deleted (order_id, deleted_at)
        VALUES (OLD.id, NOW());

-- 替代查询
CREATE RULE protect_orders AS
    ON DELETE TO orders
    DO INSTEAD NOTHING;

-- 查看规则
SELECT * FROM pg_rules WHERE tablename = 'orders';

-- 删除规则
DROP RULE log_delete ON orders;

6.8 小结

本章学习了 PostgreSQL 的高级特性:

  1. 事务隔离级别:READ COMMITTED、REPEATABLE READ、SERIALIZABLE
  2. 锁机制:表级锁、行级锁、死锁检测、咨询锁
  3. 存储过程与函数:参数、返回值、流程控制、异常处理
  4. 触发器:BEFORE/AFTER/INSTEAD OF,行级/语句级
  5. 视图:普通视图、可更新视图、物化视图
  6. 规则系统:查询重写

下一章将学习 PostgreSQL 与 Python 的集成。