第七章:Python 集成¶
PostgreSQL 与 Python 的集成非常成熟,本章将介绍多种连接方式、ORM 框架以及最佳实践。
7.1 连接方式概述¶
Python 连接 PostgreSQL 有多种方式:
| 方式 | 特点 | 适用场景 |
|---|---|---|
| psycopg2 | 底层驱动,性能最好 | 高性能应用 |
| psycopg3 | 新版本,异步支持 | 现代应用 |
| asyncpg | 纯异步驱动 | 异步应用 |
| SQLAlchemy | ORM 框架 | 复杂业务 |
| SQLModel | 现代 ORM | FastAPI 项目 |
7.2 psycopg2¶
安装与连接¶
import psycopg2
from psycopg2 import sql
from psycopg2.extras import RealDictCursor
# 基本连接
conn = psycopg2.connect(
host="localhost",
port=5432,
database="mydb",
user="postgres",
password="password"
)
# 使用 DSN 字符串
conn = psycopg2.connect("postgresql://postgres:password@localhost:5432/mydb")
# 使用环境变量
import os
conn = psycopg2.connect(os.environ.get("DATABASE_URL"))
# 创建游标
cursor = conn.cursor()
# 使用字典游标(返回字典而非元组)
cursor = conn.cursor(cursor_factory=RealDictCursor)
基本操作¶
# 创建表
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
email VARCHAR(255) UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# 插入数据
cursor.execute(
"INSERT INTO users (name, email) VALUES (%s, %s)",
("张三", "zhangsan@example.com")
)
# 获取插入的 ID
cursor.execute(
"INSERT INTO users (name, email) VALUES (%s, %s) RETURNING id",
("李四", "lisi@example.com")
)
user_id = cursor.fetchone()[0]
print(f"插入的用户 ID: {user_id}")
# 批量插入
users = [
("王五", "wangwu@example.com"),
("赵六", "zhaoliu@example.com"),
("钱七", "qianqi@example.com"),
]
cursor.executemany(
"INSERT INTO users (name, email) VALUES (%s, %s)",
users
)
# 提交事务
conn.commit()
查询数据¶
# 查询单条
cursor.execute("SELECT * FROM users WHERE id = %s", (1,))
user = cursor.fetchone()
print(user)
# 查询多条
cursor.execute("SELECT * FROM users LIMIT 10")
users = cursor.fetchall()
for user in users:
print(user)
# 使用字典游标
cursor = conn.cursor(cursor_factory=RealDictCursor)
cursor.execute("SELECT * FROM users WHERE name LIKE %s", ("张%",))
users = cursor.fetchall()
for user in users:
print(user['name'], user['email'])
# 分页查询
page = 1
per_page = 10
offset = (page - 1) * per_page
cursor.execute(
"SELECT * FROM users ORDER BY id LIMIT %s OFFSET %s",
(per_page, offset)
)
users = cursor.fetchall()
# 使用迭代器(大数据量)
cursor.execute("SELECT * FROM users")
while True:
rows = cursor.fetchmany(100) # 每次取 100 条
if not rows:
break
for row in rows:
process(row)
事务处理¶
# 自动事务
try:
cursor.execute("INSERT INTO orders (customer_id, total) VALUES (%s, %s)", (1, 100))
cursor.execute("UPDATE customers SET balance = balance - %s WHERE id = %s", (100, 1))
conn.commit()
except Exception as e:
conn.rollback()
print(f"事务失败: {e}")
# 手动控制事务
conn.autocommit = False
# 使用上下文管理器
with conn:
with conn.cursor() as cursor:
cursor.execute("INSERT INTO orders (customer_id, total) VALUES (%s, %s)", (1, 100))
cursor.execute("UPDATE customers SET balance = balance - %s WHERE id = %s", (100, 1))
# 自动提交或回滚
# 保存点
with conn:
cursor = conn.cursor()
cursor.execute("INSERT INTO orders (customer_id, total) VALUES (%s, %s)", (1, 100))
savepoint = conn.savepoint()
try:
cursor.execute("INSERT INTO order_items (order_id, product_id) VALUES (%s, %s)", (1, 999))
except:
conn.rollback(savepoint)
# 主事务继续
SQL 注入防护¶
# ❌ 危险:直接拼接 SQL
name = "张三'; DROP TABLE users; --"
cursor.execute(f"SELECT * FROM users WHERE name = '{name}'") # SQL 注入!
# ✅ 安全:使用参数化查询
cursor.execute("SELECT * FROM users WHERE name = %s", (name,))
# ✅ 安全:使用 sql 模块动态构建
from psycopg2 import sql
table_name = "users"
column_name = "name"
query = sql.SQL("SELECT * FROM {} WHERE {} = %s").format(
sql.Identifier(table_name),
sql.Identifier(column_name)
)
cursor.execute(query, ("张三",))
# IN 查询
ids = [1, 2, 3, 4, 5]
cursor.execute(
sql.SQL("SELECT * FROM users WHERE id IN ({})").format(
sql.SQL(",").join(map(sql.Literal, ids))
)
)
# 或使用 ANY
cursor.execute("SELECT * FROM users WHERE id = ANY(%s)", (ids,))
连接池¶
from psycopg2 import pool
# 创建连接池
connection_pool = pool.ThreadedConnectionPool(
minconn=1,
maxconn=10,
host="localhost",
database="mydb",
user="postgres",
password="password"
)
# 获取连接
conn = connection_pool.getconn()
try:
cursor = conn.cursor()
cursor.execute("SELECT * FROM users")
users = cursor.fetchall()
finally:
# 归还连接
connection_pool.putconn(conn)
# 使用上下文管理器
class ConnectionPool:
def __init__(self, pool):
self.pool = pool
def __enter__(self):
self.conn = self.pool.getconn()
return self.conn
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type:
self.conn.rollback()
self.pool.putconn(self.conn)
with ConnectionPool(connection_pool) as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM users")
7.3 psycopg3 (psycopg)¶
psycopg3 是 psycopg2 的下一代版本,支持异步:
同步使用¶
import psycopg
# 连接
with psycopg.connect("postgresql://postgres:password@localhost/mydb") as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT * FROM users")
for row in cursor:
print(row)
# 参数化查询(使用 %s 或 $1)
with psycopg.connect(DSN) as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT * FROM users WHERE name = %s", ("张三",))
# 或使用编号参数
cursor.execute("SELECT * FROM users WHERE name = %s AND email = %s", ("张三", "test@example.com"))
异步使用¶
import asyncio
import psycopg
async def main():
async with await psycopg.AsyncConnection.connect(
"postgresql://postgres:password@localhost/mydb"
) as conn:
async with conn.cursor() as cursor:
await cursor.execute("SELECT * FROM users")
async for row in cursor:
print(row)
asyncio.run(main())
# 异步批量操作
async def batch_insert():
async with await psycopg.AsyncConnection.connect(DSN) as conn:
async with conn.cursor() as cursor:
await cursor.executemany(
"INSERT INTO users (name, email) VALUES (%s, %s)",
[("用户1", "user1@example.com"), ("用户2", "user2@example.com")]
)
await conn.commit()
7.4 asyncpg¶
asyncpg 是专为异步设计的 PostgreSQL 驱动,性能优秀:
import asyncio
import asyncpg
async def main():
# 连接
conn = await asyncpg.connect(
host="localhost",
port=5432,
database="mydb",
user="postgres",
password="password"
)
# 查询
rows = await conn.fetch("SELECT * FROM users WHERE id = $1", 1)
for row in rows:
print(dict(row))
# 查询单条
row = await conn.fetchrow("SELECT * FROM users WHERE id = $1", 1)
# 查询单个值
count = await conn.fetchval("SELECT COUNT(*) FROM users")
# 执行语句
await conn.execute(
"INSERT INTO users (name, email) VALUES ($1, $2)",
"张三", "zhangsan@example.com"
)
# 批量插入
await conn.executemany(
"INSERT INTO users (name, email) VALUES ($1, $2)",
[("用户1", "user1@example.com"), ("用户2", "user2@example.com")]
)
# 事务
async with conn.transaction():
await conn.execute("INSERT INTO orders (customer_id) VALUES ($1)", 1)
await conn.execute("UPDATE customers SET order_count = order_count + 1 WHERE id = $1", 1)
# 关闭连接
await conn.close()
asyncio.run(main())
连接池¶
import asyncpg
async def with_pool():
# 创建连接池
pool = await asyncpg.create_pool(
host="localhost",
database="mydb",
user="postgres",
password="password",
min_size=5,
max_size=20
)
async with pool.acquire() as conn:
rows = await conn.fetch("SELECT * FROM users")
# 关闭连接池
await pool.close()
# 应用启动时创建,关闭时释放
class Database:
def __init__(self):
self.pool = None
async def connect(self):
self.pool = await asyncpg.create_pool(DSN)
async def disconnect(self):
await self.pool.close()
async def get_users(self):
async with self.pool.acquire() as conn:
return await conn.fetch("SELECT * FROM users")
db = Database()
# FastAPI 集成
from fastapi import FastAPI
app = FastAPI()
@app.on_event("startup")
async def startup():
await db.connect()
@app.on_event("shutdown")
async def shutdown():
await db.disconnect()
@app.get("/users")
async def list_users():
return await db.get_users()
7.5 SQLAlchemy¶
SQLAlchemy 是 Python 最流行的 ORM 框架:
基本使用¶
from sqlalchemy import create_engine, Column, Integer, String, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
# 连接
engine = create_engine("postgresql://postgres:password@localhost/mydb", echo=True)
Session = sessionmaker(bind=engine)
session = Session()
Base = declarative_base()
# 定义模型
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False)
email = Column(String(255), unique=True, nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
def __repr__(self):
return f"<User(id={self.id}, name='{self.name}')>"
# 创建表
Base.metadata.create_all(engine)
# CRUD 操作
# 创建
user = User(name="张三", email="zhangsan@example.com")
session.add(user)
session.commit()
# 批量创建
users = [
User(name="李四", email="lisi@example.com"),
User(name="王五", email="wangwu@example.com"),
]
session.add_all(users)
session.commit()
# 查询
user = session.query(User).filter_by(id=1).first()
users = session.query(User).filter(User.name.like("张%")).all()
# 更新
user = session.query(User).get(1)
user.name = "张三丰"
session.commit()
# 批量更新
session.query(User).filter(User.name == "张三").update({"name": "张三丰"})
session.commit()
# 删除
user = session.query(User).get(1)
session.delete(user)
session.commit()
# 批量删除
session.query(User).filter(User.name.like("测试%")).delete()
session.commit()
关系映射¶
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship
class Customer(Base):
__tablename__ = "customers"
id = Column(Integer, primary_key=True)
name = Column(String(100))
email = Column(String(255))
orders = relationship("Order", back_populates="customer")
class Order(Base):
__tablename__ = "orders"
id = Column(Integer, primary_key=True)
customer_id = Column(Integer, ForeignKey("customers.id"))
total = Column(Numeric(10, 2))
customer = relationship("Customer", back_populates="orders")
# 使用关系
customer = Customer(name="张三", email="zhangsan@example.com")
order1 = Order(total=100)
order2 = Order(total=200)
customer.orders = [order1, order2]
session.add(customer)
session.commit()
# 查询关系
customer = session.query(Customer).first()
for order in customer.orders:
print(order.total)
异步 SQLAlchemy¶
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy import select
# 异步引擎
engine = create_async_engine(
"postgresql+asyncpg://postgres:password@localhost/mydb",
echo=True
)
AsyncSessionLocal = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async def get_users():
async with AsyncSessionLocal() as session:
result = await session.execute(select(User))
users = result.scalars().all()
return users
async def create_user(name: str, email: str):
async with AsyncSessionLocal() as session:
user = User(name=name, email=email)
session.add(user)
await session.commit()
await session.refresh(user)
return user
# FastAPI 集成
from fastapi import Depends
async def get_db():
async with AsyncSessionLocal() as session:
try:
yield session
finally:
await session.close()
@app.post("/users")
async def create_user_api(
name: str,
email: str,
db: AsyncSession = Depends(get_db)
):
user = User(name=name, email=email)
db.add(user)
await db.commit()
await db.refresh(user)
return user
7.6 SQLModel¶
SQLModel 是结合 SQLAlchemy 和 Pydantic 的现代 ORM,特别适合 FastAPI:
from sqlmodel import SQLModel, Field, Session, create_engine, select
from typing import Optional
from datetime import datetime
# 定义模型(同时是 Pydantic 模型)
class User(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
name: str = Field(max_length=100)
email: str = Field(max_length=255, unique=True)
created_at: datetime = Field(default_factory=datetime.utcnow)
# 创建引擎
engine = create_engine("postgresql://postgres:password@localhost/mydb")
# 创建表
SQLModel.metadata.create_all(engine)
# CRUD
def create_user(name: str, email: str) -> User:
user = User(name=name, email=email)
with Session(engine) as session:
session.add(user)
session.commit()
session.refresh(user)
return user
def get_users() -> list[User]:
with Session(engine) as session:
statement = select(User)
users = session.exec(statement).all()
return users
def get_user(user_id: int) -> Optional[User]:
with Session(engine) as session:
return session.get(User, user_id)
# 异步版本
from sqlmodel.ext.asyncio.session import AsyncSession, AsyncEngine
async_engine = AsyncEngine(create_engine(
"postgresql+asyncpg://postgres:password@localhost/mydb"
))
async def create_user_async(name: str, email: str) -> User:
async with AsyncSession(async_engine) as session:
user = User(name=name, email=email)
session.add(user)
await session.commit()
await session.refresh(user)
return user
# FastAPI 集成
from fastapi import FastAPI, HTTPException
app = FastAPI()
@app.post("/users", response_model=User)
async def create_user_api(user: User):
return await create_user_async(user.name, user.email)
@app.get("/users/{user_id}", response_model=User)
async def get_user_api(user_id: int):
user = await get_user(user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
7.7 最佳实践¶
配置管理¶
# 使用 pydantic-settings 管理配置
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
database_url: str = "postgresql://postgres:password@localhost/mydb"
database_pool_size: int = 10
database_max_overflow: int = 20
class Config:
env_file = ".env"
settings = Settings()
# 连接池配置
engine = create_engine(
settings.database_url,
pool_size=settings.database_pool_size,
max_overflow=settings.database_max_overflow,
pool_pre_ping=True, # 检查连接有效性
pool_recycle=3600, # 连接回收时间
)
连接管理¶
# 使用上下文管理器
from contextlib import contextmanager
@contextmanager
def get_db_session():
session = SessionLocal()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
# 使用
with get_db_session() as session:
user = session.query(User).first()
错误处理¶
from psycopg2 import errors
from sqlalchemy.exc import IntegrityError, OperationalError
try:
user = User(email="existing@example.com")
session.add(user)
session.commit()
except IntegrityError as e:
session.rollback()
if "unique constraint" in str(e):
raise ValueError("邮箱已存在")
raise
except OperationalError as e:
session.rollback()
raise ConnectionError("数据库连接失败")
7.8 小结¶
本章学习了 PostgreSQL 与 Python 的集成:
- psycopg2:经典同步驱动,性能稳定
- psycopg3:新一代驱动,支持异步
- asyncpg:纯异步驱动,性能优秀
- SQLAlchemy:功能强大的 ORM
- SQLModel:现代 ORM,适合 FastAPI
- 最佳实践:连接池、事务管理、错误处理
下一章将学习 PostgreSQL 的备份与恢复。