跳转至

第七章:Python 集成

PostgreSQL 与 Python 的集成非常成熟,本章将介绍多种连接方式、ORM 框架以及最佳实践。

7.1 连接方式概述

Python 连接 PostgreSQL 有多种方式:

方式 特点 适用场景
psycopg2 底层驱动,性能最好 高性能应用
psycopg3 新版本,异步支持 现代应用
asyncpg 纯异步驱动 异步应用
SQLAlchemy ORM 框架 复杂业务
SQLModel 现代 ORM FastAPI 项目

7.2 psycopg2

安装与连接

# 安装
pip install psycopg2-binary  # 预编译版本
# 或
pip install psycopg2  # 需要编译,性能更好
import psycopg2
from psycopg2 import sql
from psycopg2.extras import RealDictCursor

# 基本连接
conn = psycopg2.connect(
    host="localhost",
    port=5432,
    database="mydb",
    user="postgres",
    password="password"
)

# 使用 DSN 字符串
conn = psycopg2.connect("postgresql://postgres:password@localhost:5432/mydb")

# 使用环境变量
import os
conn = psycopg2.connect(os.environ.get("DATABASE_URL"))

# 创建游标
cursor = conn.cursor()

# 使用字典游标(返回字典而非元组)
cursor = conn.cursor(cursor_factory=RealDictCursor)

基本操作

# 创建表
cursor.execute("""
    CREATE TABLE IF NOT EXISTS users (
        id SERIAL PRIMARY KEY,
        name VARCHAR(100) NOT NULL,
        email VARCHAR(255) UNIQUE NOT NULL,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    )
""")

# 插入数据
cursor.execute(
    "INSERT INTO users (name, email) VALUES (%s, %s)",
    ("张三", "zhangsan@example.com")
)

# 获取插入的 ID
cursor.execute(
    "INSERT INTO users (name, email) VALUES (%s, %s) RETURNING id",
    ("李四", "lisi@example.com")
)
user_id = cursor.fetchone()[0]
print(f"插入的用户 ID: {user_id}")

# 批量插入
users = [
    ("王五", "wangwu@example.com"),
    ("赵六", "zhaoliu@example.com"),
    ("钱七", "qianqi@example.com"),
]
cursor.executemany(
    "INSERT INTO users (name, email) VALUES (%s, %s)",
    users
)

# 提交事务
conn.commit()

查询数据

# 查询单条
cursor.execute("SELECT * FROM users WHERE id = %s", (1,))
user = cursor.fetchone()
print(user)

# 查询多条
cursor.execute("SELECT * FROM users LIMIT 10")
users = cursor.fetchall()
for user in users:
    print(user)

# 使用字典游标
cursor = conn.cursor(cursor_factory=RealDictCursor)
cursor.execute("SELECT * FROM users WHERE name LIKE %s", ("张%",))
users = cursor.fetchall()
for user in users:
    print(user['name'], user['email'])

# 分页查询
page = 1
per_page = 10
offset = (page - 1) * per_page
cursor.execute(
    "SELECT * FROM users ORDER BY id LIMIT %s OFFSET %s",
    (per_page, offset)
)
users = cursor.fetchall()

# 使用迭代器(大数据量)
cursor.execute("SELECT * FROM users")
while True:
    rows = cursor.fetchmany(100)  # 每次取 100 条
    if not rows:
        break
    for row in rows:
        process(row)

事务处理

# 自动事务
try:
    cursor.execute("INSERT INTO orders (customer_id, total) VALUES (%s, %s)", (1, 100))
    cursor.execute("UPDATE customers SET balance = balance - %s WHERE id = %s", (100, 1))
    conn.commit()
except Exception as e:
    conn.rollback()
    print(f"事务失败: {e}")

# 手动控制事务
conn.autocommit = False

# 使用上下文管理器
with conn:
    with conn.cursor() as cursor:
        cursor.execute("INSERT INTO orders (customer_id, total) VALUES (%s, %s)", (1, 100))
        cursor.execute("UPDATE customers SET balance = balance - %s WHERE id = %s", (100, 1))
# 自动提交或回滚

# 保存点
with conn:
    cursor = conn.cursor()
    cursor.execute("INSERT INTO orders (customer_id, total) VALUES (%s, %s)", (1, 100))

    savepoint = conn.savepoint()
    try:
        cursor.execute("INSERT INTO order_items (order_id, product_id) VALUES (%s, %s)", (1, 999))
    except:
        conn.rollback(savepoint)
        # 主事务继续

SQL 注入防护

# ❌ 危险:直接拼接 SQL
name = "张三'; DROP TABLE users; --"
cursor.execute(f"SELECT * FROM users WHERE name = '{name}'")  # SQL 注入!

# ✅ 安全:使用参数化查询
cursor.execute("SELECT * FROM users WHERE name = %s", (name,))

# ✅ 安全:使用 sql 模块动态构建
from psycopg2 import sql

table_name = "users"
column_name = "name"
query = sql.SQL("SELECT * FROM {} WHERE {} = %s").format(
    sql.Identifier(table_name),
    sql.Identifier(column_name)
)
cursor.execute(query, ("张三",))

# IN 查询
ids = [1, 2, 3, 4, 5]
cursor.execute(
    sql.SQL("SELECT * FROM users WHERE id IN ({})").format(
        sql.SQL(",").join(map(sql.Literal, ids))
    )
)

# 或使用 ANY
cursor.execute("SELECT * FROM users WHERE id = ANY(%s)", (ids,))

连接池

from psycopg2 import pool

# 创建连接池
connection_pool = pool.ThreadedConnectionPool(
    minconn=1,
    maxconn=10,
    host="localhost",
    database="mydb",
    user="postgres",
    password="password"
)

# 获取连接
conn = connection_pool.getconn()
try:
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM users")
    users = cursor.fetchall()
finally:
    # 归还连接
    connection_pool.putconn(conn)

# 使用上下文管理器
class ConnectionPool:
    def __init__(self, pool):
        self.pool = pool

    def __enter__(self):
        self.conn = self.pool.getconn()
        return self.conn

    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_type:
            self.conn.rollback()
        self.pool.putconn(self.conn)

with ConnectionPool(connection_pool) as conn:
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM users")

7.3 psycopg3 (psycopg)

psycopg3 是 psycopg2 的下一代版本,支持异步:

pip install psycopg[binary,pool]

同步使用

import psycopg

# 连接
with psycopg.connect("postgresql://postgres:password@localhost/mydb") as conn:
    with conn.cursor() as cursor:
        cursor.execute("SELECT * FROM users")
        for row in cursor:
            print(row)

# 参数化查询(使用 %s 或 $1)
with psycopg.connect(DSN) as conn:
    with conn.cursor() as cursor:
        cursor.execute("SELECT * FROM users WHERE name = %s", ("张三",))
        # 或使用编号参数
        cursor.execute("SELECT * FROM users WHERE name = %s AND email = %s", ("张三", "test@example.com"))

异步使用

import asyncio
import psycopg

async def main():
    async with await psycopg.AsyncConnection.connect(
        "postgresql://postgres:password@localhost/mydb"
    ) as conn:
        async with conn.cursor() as cursor:
            await cursor.execute("SELECT * FROM users")
            async for row in cursor:
                print(row)

asyncio.run(main())

# 异步批量操作
async def batch_insert():
    async with await psycopg.AsyncConnection.connect(DSN) as conn:
        async with conn.cursor() as cursor:
            await cursor.executemany(
                "INSERT INTO users (name, email) VALUES (%s, %s)",
                [("用户1", "user1@example.com"), ("用户2", "user2@example.com")]
            )
        await conn.commit()

7.4 asyncpg

asyncpg 是专为异步设计的 PostgreSQL 驱动,性能优秀:

pip install asyncpg
import asyncio
import asyncpg

async def main():
    # 连接
    conn = await asyncpg.connect(
        host="localhost",
        port=5432,
        database="mydb",
        user="postgres",
        password="password"
    )

    # 查询
    rows = await conn.fetch("SELECT * FROM users WHERE id = $1", 1)
    for row in rows:
        print(dict(row))

    # 查询单条
    row = await conn.fetchrow("SELECT * FROM users WHERE id = $1", 1)

    # 查询单个值
    count = await conn.fetchval("SELECT COUNT(*) FROM users")

    # 执行语句
    await conn.execute(
        "INSERT INTO users (name, email) VALUES ($1, $2)",
        "张三", "zhangsan@example.com"
    )

    # 批量插入
    await conn.executemany(
        "INSERT INTO users (name, email) VALUES ($1, $2)",
        [("用户1", "user1@example.com"), ("用户2", "user2@example.com")]
    )

    # 事务
    async with conn.transaction():
        await conn.execute("INSERT INTO orders (customer_id) VALUES ($1)", 1)
        await conn.execute("UPDATE customers SET order_count = order_count + 1 WHERE id = $1", 1)

    # 关闭连接
    await conn.close()

asyncio.run(main())

连接池

import asyncpg

async def with_pool():
    # 创建连接池
    pool = await asyncpg.create_pool(
        host="localhost",
        database="mydb",
        user="postgres",
        password="password",
        min_size=5,
        max_size=20
    )

    async with pool.acquire() as conn:
        rows = await conn.fetch("SELECT * FROM users")

    # 关闭连接池
    await pool.close()

# 应用启动时创建,关闭时释放
class Database:
    def __init__(self):
        self.pool = None

    async def connect(self):
        self.pool = await asyncpg.create_pool(DSN)

    async def disconnect(self):
        await self.pool.close()

    async def get_users(self):
        async with self.pool.acquire() as conn:
            return await conn.fetch("SELECT * FROM users")

db = Database()

# FastAPI 集成
from fastapi import FastAPI

app = FastAPI()

@app.on_event("startup")
async def startup():
    await db.connect()

@app.on_event("shutdown")
async def shutdown():
    await db.disconnect()

@app.get("/users")
async def list_users():
    return await db.get_users()

7.5 SQLAlchemy

SQLAlchemy 是 Python 最流行的 ORM 框架:

pip install sqlalchemy psycopg2-binary

基本使用

from sqlalchemy import create_engine, Column, Integer, String, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime

# 连接
engine = create_engine("postgresql://postgres:password@localhost/mydb", echo=True)
Session = sessionmaker(bind=engine)
session = Session()
Base = declarative_base()

# 定义模型
class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True)
    name = Column(String(100), nullable=False)
    email = Column(String(255), unique=True, nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow)

    def __repr__(self):
        return f"<User(id={self.id}, name='{self.name}')>"

# 创建表
Base.metadata.create_all(engine)

# CRUD 操作
# 创建
user = User(name="张三", email="zhangsan@example.com")
session.add(user)
session.commit()

# 批量创建
users = [
    User(name="李四", email="lisi@example.com"),
    User(name="王五", email="wangwu@example.com"),
]
session.add_all(users)
session.commit()

# 查询
user = session.query(User).filter_by(id=1).first()
users = session.query(User).filter(User.name.like("张%")).all()

# 更新
user = session.query(User).get(1)
user.name = "张三丰"
session.commit()

# 批量更新
session.query(User).filter(User.name == "张三").update({"name": "张三丰"})
session.commit()

# 删除
user = session.query(User).get(1)
session.delete(user)
session.commit()

# 批量删除
session.query(User).filter(User.name.like("测试%")).delete()
session.commit()

关系映射

from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship

class Customer(Base):
    __tablename__ = "customers"

    id = Column(Integer, primary_key=True)
    name = Column(String(100))
    email = Column(String(255))
    orders = relationship("Order", back_populates="customer")

class Order(Base):
    __tablename__ = "orders"

    id = Column(Integer, primary_key=True)
    customer_id = Column(Integer, ForeignKey("customers.id"))
    total = Column(Numeric(10, 2))
    customer = relationship("Customer", back_populates="orders")

# 使用关系
customer = Customer(name="张三", email="zhangsan@example.com")
order1 = Order(total=100)
order2 = Order(total=200)
customer.orders = [order1, order2]
session.add(customer)
session.commit()

# 查询关系
customer = session.query(Customer).first()
for order in customer.orders:
    print(order.total)

异步 SQLAlchemy

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy import select

# 异步引擎
engine = create_async_engine(
    "postgresql+asyncpg://postgres:password@localhost/mydb",
    echo=True
)

AsyncSessionLocal = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

async def get_users():
    async with AsyncSessionLocal() as session:
        result = await session.execute(select(User))
        users = result.scalars().all()
        return users

async def create_user(name: str, email: str):
    async with AsyncSessionLocal() as session:
        user = User(name=name, email=email)
        session.add(user)
        await session.commit()
        await session.refresh(user)
        return user

# FastAPI 集成
from fastapi import Depends

async def get_db():
    async with AsyncSessionLocal() as session:
        try:
            yield session
        finally:
            await session.close()

@app.post("/users")
async def create_user_api(
    name: str,
    email: str,
    db: AsyncSession = Depends(get_db)
):
    user = User(name=name, email=email)
    db.add(user)
    await db.commit()
    await db.refresh(user)
    return user

7.6 SQLModel

SQLModel 是结合 SQLAlchemy 和 Pydantic 的现代 ORM,特别适合 FastAPI:

pip install sqlmodel asyncpg
from sqlmodel import SQLModel, Field, Session, create_engine, select
from typing import Optional
from datetime import datetime

# 定义模型(同时是 Pydantic 模型)
class User(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    name: str = Field(max_length=100)
    email: str = Field(max_length=255, unique=True)
    created_at: datetime = Field(default_factory=datetime.utcnow)

# 创建引擎
engine = create_engine("postgresql://postgres:password@localhost/mydb")

# 创建表
SQLModel.metadata.create_all(engine)

# CRUD
def create_user(name: str, email: str) -> User:
    user = User(name=name, email=email)
    with Session(engine) as session:
        session.add(user)
        session.commit()
        session.refresh(user)
    return user

def get_users() -> list[User]:
    with Session(engine) as session:
        statement = select(User)
        users = session.exec(statement).all()
    return users

def get_user(user_id: int) -> Optional[User]:
    with Session(engine) as session:
        return session.get(User, user_id)

# 异步版本
from sqlmodel.ext.asyncio.session import AsyncSession, AsyncEngine

async_engine = AsyncEngine(create_engine(
    "postgresql+asyncpg://postgres:password@localhost/mydb"
))

async def create_user_async(name: str, email: str) -> User:
    async with AsyncSession(async_engine) as session:
        user = User(name=name, email=email)
        session.add(user)
        await session.commit()
        await session.refresh(user)
    return user

# FastAPI 集成
from fastapi import FastAPI, HTTPException

app = FastAPI()

@app.post("/users", response_model=User)
async def create_user_api(user: User):
    return await create_user_async(user.name, user.email)

@app.get("/users/{user_id}", response_model=User)
async def get_user_api(user_id: int):
    user = await get_user(user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user

7.7 最佳实践

配置管理

# 使用 pydantic-settings 管理配置
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    database_url: str = "postgresql://postgres:password@localhost/mydb"
    database_pool_size: int = 10
    database_max_overflow: int = 20

    class Config:
        env_file = ".env"

settings = Settings()

# 连接池配置
engine = create_engine(
    settings.database_url,
    pool_size=settings.database_pool_size,
    max_overflow=settings.database_max_overflow,
    pool_pre_ping=True,  # 检查连接有效性
    pool_recycle=3600,   # 连接回收时间
)

连接管理

# 使用上下文管理器
from contextlib import contextmanager

@contextmanager
def get_db_session():
    session = SessionLocal()
    try:
        yield session
        session.commit()
    except Exception:
        session.rollback()
        raise
    finally:
        session.close()

# 使用
with get_db_session() as session:
    user = session.query(User).first()

错误处理

from psycopg2 import errors
from sqlalchemy.exc import IntegrityError, OperationalError

try:
    user = User(email="existing@example.com")
    session.add(user)
    session.commit()
except IntegrityError as e:
    session.rollback()
    if "unique constraint" in str(e):
        raise ValueError("邮箱已存在")
    raise
except OperationalError as e:
    session.rollback()
    raise ConnectionError("数据库连接失败")

7.8 小结

本章学习了 PostgreSQL 与 Python 的集成:

  1. psycopg2:经典同步驱动,性能稳定
  2. psycopg3:新一代驱动,支持异步
  3. asyncpg:纯异步驱动,性能优秀
  4. SQLAlchemy:功能强大的 ORM
  5. SQLModel:现代 ORM,适合 FastAPI
  6. 最佳实践:连接池、事务管理、错误处理

下一章将学习 PostgreSQL 的备份与恢复。