第十一章:异步编程
asyncio 基础
import asyncio
# 定义协程
async def hello():
print("Hello")
await asyncio.sleep(1)
print("World")
# 运行协程
asyncio.run(hello())
并发执行
import asyncio
async def task(name, delay):
print(f"{name} 开始")
await asyncio.sleep(delay)
print(f"{name} 完成")
return name
async def main():
# 并发执行多个任务
results = await asyncio.gather(
task("A", 2),
task("B", 1),
task("C", 3)
)
print(results) # ['A', 'B', 'C']
asyncio.run(main())
创建任务
import asyncio
async def main():
# 创建任务
task1 = asyncio.create_task(some_coroutine())
task2 = asyncio.create_task(another_coroutine())
# 等待完成
await task1
await task2
# 超时控制
async def with_timeout():
try:
result = await asyncio.wait_for(
slow_operation(),
timeout=5.0
)
except asyncio.TimeoutError:
print("操作超时")
异步生成器
async def async_generator():
for i in range(5):
await asyncio.sleep(0.1)
yield i
async def main():
async for value in async_generator():
print(value)
# 异步推导式
async def main():
result = [i async for i in async_generator()]
异步上下文管理器
class AsyncContext:
async def __aenter__(self):
print("进入")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("退出")
async def main():
async with AsyncContext():
print("执行中")
异步 HTTP 请求
import aiohttp
import asyncio
async def fetch(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
async def main():
urls = ['http://example.com'] * 5
tasks = [fetch(url) for url in urls]
results = await asyncio.gather(*tasks)
同步与异步混用
# 在同步代码中运行异步
import asyncio
def sync_function():
result = asyncio.run(async_function())
# 在异步代码中运行同步(会阻塞)
import asyncio
from concurrent.futures import ThreadPoolExecutor
async def run_sync():
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(pool, sync_function)