跳转至

第三章:并发模式

生产者-消费者模式

import asyncio
import random

async def producer(queue, producer_id):
    for i in range(5):
        item = f"生产者{producer_id}-商品{i}"
        await queue.put(item)
        print(f"生产: {item}")
        await asyncio.sleep(random.random())

async def consumer(queue, consumer_id):
    while True:
        item = await queue.get()
        print(f"消费者{consumer_id} 处理: {item}")
        await asyncio.sleep(random.random())
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=10)

    # 启动生产者和消费者
    producers = [producer(queue, i) for i in range(2)]
    consumers = [consumer(queue, i) for i in range(3)]

    await asyncio.gather(*producers)
    await queue.join()  # 等待所有任务完成

工作池模式

async def worker(name, queue):
    while True:
        task = await queue.get()
        try:
            print(f"{name} 处理: {task}")
            await process_task(task)
        finally:
            queue.task_done()

async def process_task(task):
    await asyncio.sleep(1)
    return f"完成: {task}"

async def main():
    queue = asyncio.Queue()

    # 添加任务
    for i in range(20):
        await queue.put(f"任务{i}")

    # 创建工作池
    workers = [worker(f"Worker-{i}", queue) for i in range(5)]

    # 等待所有任务完成
    await queue.join()

    # 取消工作协程
    for w in asyncio.all_tasks():
        if w is not asyncio.current_task():
            w.cancel()

限流模式

import asyncio

class RateLimiter:
    def __init__(self, rate, period=1.0):
        self.rate = rate
        self.period = period
        self.tokens = rate
        self.last_time = asyncio.get_event_loop().time()

    async def acquire(self):
        now = asyncio.get_event_loop().time()
        elapsed = now - self.last_time
        self.tokens = min(self.rate, self.tokens + elapsed * self.rate / self.period)
        self.last_time = now

        if self.tokens < 1:
            wait_time = (1 - self.tokens) * self.period / self.rate
            await asyncio.sleep(wait_time)
            self.tokens = 0
        else:
            self.tokens -= 1

async def main():
    limiter = RateLimiter(rate=5)  # 每秒5个请求

    async def request(i):
        await limiter.acquire()
        print(f"请求 {i} 执行于 {time.time()}")

    await asyncio.gather(*[request(i) for i in range(20)])

超时和取消

async def with_timeout(coro, timeout):
    try:
        return await asyncio.wait_for(coro, timeout)
    except asyncio.TimeoutError:
        print("操作超时")
        return None

async def cancellable_task():
    try:
        await asyncio.sleep(10)
    except asyncio.CancelledError:
        print("任务被取消")
        raise

async def main():
    task = asyncio.create_task(cancellable_task())

    await asyncio.sleep(1)
    task.cancel()  # 取消任务

    try:
        await task
    except asyncio.CancelledError:
        print("任务已取消")

错误处理

async def safe_gather(*tasks):
    results = await asyncio.gather(*tasks, return_exceptions=True)

    successes = []
    failures = []

    for i, result in enumerate(results):
        if isinstance(result, Exception):
            failures.append((i, result))
        else:
            successes.append((i, result))

    return successes, failures

async def main():
    results = await safe_gather(
        task1(),
        task2(),
        task3()
    )

异步 HTTP 客户端

import aiohttp

async def fetch(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

async def fetch_all(urls):
    tasks = [fetch(url) for url in urls]
    return await asyncio.gather(*tasks)

异步数据库

import asyncpg

async def query_db():
    conn = await asyncpg.connect("postgresql://user:pass@localhost/db")

    rows = await conn.fetch("SELECT * FROM users")
    await conn.close()

    return rows

小结

本章学习了:

  • ✅ 生产者-消费者模式
  • ✅ 工作池模式
  • ✅ 限流模式
  • ✅ 超时和取消
  • ✅ 错误处理
  • ✅ 异步 HTTP 客户端
  • ✅ 异步数据库

恭喜完成异步编程教程! 🎉