第三章:并发模式¶
生产者-消费者模式¶
import asyncio
import random
async def producer(queue, producer_id):
for i in range(5):
item = f"生产者{producer_id}-商品{i}"
await queue.put(item)
print(f"生产: {item}")
await asyncio.sleep(random.random())
async def consumer(queue, consumer_id):
while True:
item = await queue.get()
print(f"消费者{consumer_id} 处理: {item}")
await asyncio.sleep(random.random())
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=10)
# 启动生产者和消费者
producers = [producer(queue, i) for i in range(2)]
consumers = [consumer(queue, i) for i in range(3)]
await asyncio.gather(*producers)
await queue.join() # 等待所有任务完成
工作池模式¶
async def worker(name, queue):
while True:
task = await queue.get()
try:
print(f"{name} 处理: {task}")
await process_task(task)
finally:
queue.task_done()
async def process_task(task):
await asyncio.sleep(1)
return f"完成: {task}"
async def main():
queue = asyncio.Queue()
# 添加任务
for i in range(20):
await queue.put(f"任务{i}")
# 创建工作池
workers = [worker(f"Worker-{i}", queue) for i in range(5)]
# 等待所有任务完成
await queue.join()
# 取消工作协程
for w in asyncio.all_tasks():
if w is not asyncio.current_task():
w.cancel()
限流模式¶
import asyncio
class RateLimiter:
def __init__(self, rate, period=1.0):
self.rate = rate
self.period = period
self.tokens = rate
self.last_time = asyncio.get_event_loop().time()
async def acquire(self):
now = asyncio.get_event_loop().time()
elapsed = now - self.last_time
self.tokens = min(self.rate, self.tokens + elapsed * self.rate / self.period)
self.last_time = now
if self.tokens < 1:
wait_time = (1 - self.tokens) * self.period / self.rate
await asyncio.sleep(wait_time)
self.tokens = 0
else:
self.tokens -= 1
async def main():
limiter = RateLimiter(rate=5) # 每秒5个请求
async def request(i):
await limiter.acquire()
print(f"请求 {i} 执行于 {time.time()}")
await asyncio.gather(*[request(i) for i in range(20)])
超时和取消¶
async def with_timeout(coro, timeout):
try:
return await asyncio.wait_for(coro, timeout)
except asyncio.TimeoutError:
print("操作超时")
return None
async def cancellable_task():
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
print("任务被取消")
raise
async def main():
task = asyncio.create_task(cancellable_task())
await asyncio.sleep(1)
task.cancel() # 取消任务
try:
await task
except asyncio.CancelledError:
print("任务已取消")
错误处理¶
async def safe_gather(*tasks):
results = await asyncio.gather(*tasks, return_exceptions=True)
successes = []
failures = []
for i, result in enumerate(results):
if isinstance(result, Exception):
failures.append((i, result))
else:
successes.append((i, result))
return successes, failures
async def main():
results = await safe_gather(
task1(),
task2(),
task3()
)
异步 HTTP 客户端¶
import aiohttp
async def fetch(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
async def fetch_all(urls):
tasks = [fetch(url) for url in urls]
return await asyncio.gather(*tasks)
异步数据库¶
import asyncpg
async def query_db():
conn = await asyncpg.connect("postgresql://user:pass@localhost/db")
rows = await conn.fetch("SELECT * FROM users")
await conn.close()
return rows
小结¶
本章学习了:
- ✅ 生产者-消费者模式
- ✅ 工作池模式
- ✅ 限流模式
- ✅ 超时和取消
- ✅ 错误处理
- ✅ 异步 HTTP 客户端
- ✅ 异步数据库
恭喜完成异步编程教程! 🎉