跳转至

第十一章:异步编程

asyncio 基础

import asyncio

# 定义协程
async def hello():
    print("Hello")
    await asyncio.sleep(1)
    print("World")

# 运行协程
asyncio.run(hello())

并发执行

import asyncio

async def task(name, delay):
    print(f"{name} 开始")
    await asyncio.sleep(delay)
    print(f"{name} 完成")
    return name

async def main():
    # 并发执行多个任务
    results = await asyncio.gather(
        task("A", 2),
        task("B", 1),
        task("C", 3)
    )
    print(results)  # ['A', 'B', 'C']

asyncio.run(main())

创建任务

import asyncio

async def main():
    # 创建任务
    task1 = asyncio.create_task(some_coroutine())
    task2 = asyncio.create_task(another_coroutine())

    # 等待完成
    await task1
    await task2

# 超时控制
async def with_timeout():
    try:
        result = await asyncio.wait_for(
            slow_operation(),
            timeout=5.0
        )
    except asyncio.TimeoutError:
        print("操作超时")

异步生成器

async def async_generator():
    for i in range(5):
        await asyncio.sleep(0.1)
        yield i

async def main():
    async for value in async_generator():
        print(value)

# 异步推导式
async def main():
    result = [i async for i in async_generator()]

异步上下文管理器

class AsyncContext:
    async def __aenter__(self):
        print("进入")
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("退出")

async def main():
    async with AsyncContext():
        print("执行中")

异步 HTTP 请求

import aiohttp
import asyncio

async def fetch(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

async def main():
    urls = ['http://example.com'] * 5
    tasks = [fetch(url) for url in urls]
    results = await asyncio.gather(*tasks)

同步与异步混用

# 在同步代码中运行异步
import asyncio

def sync_function():
    result = asyncio.run(async_function())

# 在异步代码中运行同步(会阻塞)
import asyncio
from concurrent.futures import ThreadPoolExecutor

async def run_sync():
    loop = asyncio.get_event_loop()
    with ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, sync_function)