第六章:路由模式¶
6.1 简介¶
路由模式(Routing)使用 Direct Exchange 根据 Routing Key 将消息路由到特定的队列。
┌──────────┐
Producer ──────────▶ direct │─── error ───▶ Queue Error ───▶ Consumer Error
│ exchange │─── warn ───▶ Queue All
│ │─── info ───▶ Queue All
└──────────┘
特点: - 消息根据 Routing Key 精确匹配 - 一个队列可以绑定多个 Routing Key - 比 Fanout 更灵活,比 Topic 更精确
6.2 Direct Exchange¶
Direct 交换机将消息路由到 Routing Key 完全匹配的队列。
生产者¶
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明 direct 交换机
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
# 使用 severity 作为 routing key
channel.basic_publish(
exchange='direct_logs',
routing_key=severity,
body=message
)
print(f" [x] Sent [{severity}]: {message}")
connection.close()
消费者¶
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# 绑定感兴趣的 severity
severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
for severity in severities:
channel.queue_bind(
exchange='direct_logs',
queue=queue_name,
routing_key=severity
)
print(f' [*] Waiting for logs. Bound to: {severities}')
def callback(ch, method, properties, body):
print(f" [x] {method.routing_key}:{body}")
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
运行示例¶
# 订阅所有日志
python consumer.py info warning error
# 只订阅错误日志
python consumer.py error
# 发送不同级别的日志
python producer.py error "Run. Run. Or it will explode."
# [x] Sent [error]: Run. Run. Or it will explode.
python producer.py info "Info message"
# [x] Sent [info]: Info message
6.3 多重绑定¶
一个队列可以绑定多个 Routing Key:
# 绑定多个 routing key 到同一个队列
channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key='error')
channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key='warning')
channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key='info')
多个队列可以绑定相同的 Routing Key:
# 队列 A 绑定 error
channel.queue_bind(exchange='direct_logs', queue='queue_a', routing_key='error')
# 队列 B 也绑定 error
channel.queue_bind(exchange='direct_logs', queue='queue_b', routing_key='error')
# 发送 error 消息,两个队列都会收到
channel.basic_publish(exchange='direct_logs', routing_key='error', body='Error!')
6.4 日志系统示例¶
完整的日志系统¶
import pika
import json
from datetime import datetime
from enum import Enum
class LogLevel(Enum):
DEBUG = 'debug'
INFO = 'info'
WARNING = 'warning'
ERROR = 'error'
CRITICAL = 'critical'
class LogPublisher:
def __init__(self, host='localhost'):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host)
)
self.channel = self.connection.channel()
self.channel.exchange_declare(
exchange='logs',
exchange_type='direct'
)
def log(self, level: LogLevel, message: str, **kwargs):
"""发送日志"""
log_entry = {
'level': level.value,
'message': message,
'timestamp': datetime.now().isoformat(),
'extra': kwargs
}
self.channel.basic_publish(
exchange='logs',
routing_key=level.value,
body=json.dumps(log_entry)
)
print(f" [x] Logged [{level.value}]: {message}")
def close(self):
self.connection.close()
# 使用
publisher = LogPublisher()
publisher.log(LogLevel.DEBUG, "Debug message", user_id=1)
publisher.log(LogLevel.INFO, "User logged in", user_id=1)
publisher.log(LogLevel.WARNING, "High memory usage", memory_percent=85)
publisher.log(LogLevel.ERROR, "Database connection failed", error_code=500)
publisher.log(LogLevel.CRITICAL, "System shutdown", reason="maintenance")
publisher.close()
日志收集器(收集所有日志)¶
import pika
import json
from datetime import datetime
class LogCollector:
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
self.channel = self.connection.channel()
self.channel.exchange_declare(
exchange='logs',
exchange_type='direct'
)
# 创建持久化队列
self.channel.queue_declare(queue='all_logs', durable=True)
# 绑定所有日志级别
for level in ['debug', 'info', 'warning', 'error', 'critical']:
self.channel.queue_bind(
exchange='logs',
queue='all_logs',
routing_key=level
)
def start(self):
self.channel.basic_consume(
queue='all_logs',
on_message_callback=self.process_log,
auto_ack=False
)
print(' [*] Log collector started')
self.channel.start_consuming()
def process_log(self, ch, method, properties, body):
log = json.loads(body)
# 存储到数据库或文件
self.save_log(log)
print(f" [x] Collected [{log['level']}]: {log['message']}")
ch.basic_ack(delivery_tag=method.delivery_tag)
def save_log(self, log):
with open('logs.jsonl', 'a') as f:
f.write(json.dumps(log) + '\n')
collector = LogCollector()
collector.start()
错误处理器(只处理错误日志)¶
class ErrorHandler:
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
self.channel = self.connection.channel()
self.channel.exchange_declare(
exchange='logs',
exchange_type='direct'
)
# 创建错误专用队列
self.channel.queue_declare(queue='error_queue', durable=True)
# 只绑定 error 和 critical
for level in ['error', 'critical']:
self.channel.queue_bind(
exchange='logs',
queue='error_queue',
routing_key=level
)
def start(self):
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(
queue='error_queue',
on_message_callback=self.handle_error,
auto_ack=False
)
print(' [*] Error handler started')
self.channel.start_consuming()
def handle_error(self, ch, method, properties, body):
log = json.loads(body)
# 发送告警
self.send_alert(log)
print(f" [!] Error: {log['message']}")
ch.basic_ack(delivery_tag=method.delivery_tag)
def send_alert(self, log):
# 发送邮件或短信告警
print(f" [Alert] {log['level'].upper()}: {log['message']}")
handler = ErrorHandler()
handler.start()
6.5 实战:订单事件处理¶
订单事件发布者¶
import pika
import json
from datetime import datetime
from enum import Enum
class OrderEvent(Enum):
CREATED = 'order.created'
PAID = 'order.paid'
SHIPPED = 'order.shipped'
DELIVERED = 'order.delivered'
CANCELLED = 'order.cancelled'
REFUNDED = 'order.refunded'
class OrderEventPublisher:
def __init__(self, host='localhost'):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host)
)
self.channel = self.connection.channel()
self.channel.exchange_declare(
exchange='order_events',
exchange_type='direct'
)
def publish(self, event: OrderEvent, order_data: dict):
"""发布订单事件"""
message = {
'event': event.value,
'data': order_data,
'timestamp': datetime.now().isoformat()
}
self.channel.basic_publish(
exchange='order_events',
routing_key=event.value,
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2, # 持久化
)
)
print(f" [x] Event published: {event.value}")
def close(self):
self.connection.close()
# 使用
publisher = OrderEventPublisher()
# 创建订单
publisher.publish(OrderEvent.CREATED, {
'order_id': 1001,
'user_id': 1,
'amount': 199.99,
'items': [{'product_id': 1, 'quantity': 2}]
})
# 支付成功
publisher.publish(OrderEvent.PAID, {
'order_id': 1001,
'payment_method': 'alipay',
'transaction_id': 'TXN123456'
})
# 发货
publisher.publish(OrderEvent.SHIPPED, {
'order_id': 1001,
'tracking_number': 'SF1234567890',
'carrier': '顺丰'
})
publisher.close()
库存服务订阅者¶
class InventoryService:
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
self.channel = self.connection.channel()
self.channel.exchange_declare(
exchange='order_events',
exchange_type='direct'
)
self.channel.queue_declare(queue='inventory_queue', durable=True)
# 只关注订单创建和取消事件
for event in ['order.created', 'order.cancelled']:
self.channel.queue_bind(
exchange='order_events',
queue='inventory_queue',
routing_key=event
)
def start(self):
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(
queue='inventory_queue',
on_message_callback=self.handle_event,
auto_ack=False
)
print(' [*] Inventory service started')
self.channel.start_consuming()
def handle_event(self, ch, method, properties, body):
message = json.loads(body)
event = message['event']
data = message['data']
if event == 'order.created':
self.reserve_inventory(data)
elif event == 'order.cancelled':
self.release_inventory(data)
ch.basic_ack(delivery_tag=method.delivery_tag)
def reserve_inventory(self, order):
print(f" [Inventory] Reserving items for order {order['order_id']}")
for item in order['items']:
print(f" - Product {item['product_id']}: {item['quantity']}")
def release_inventory(self, order):
print(f" [Inventory] Releasing items for order {order['order_id']}")
service = InventoryService()
service.start()
物流服务订阅者¶
class ShippingService:
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
self.channel = self.connection.channel()
self.channel.exchange_declare(
exchange='order_events',
exchange_type='direct'
)
self.channel.queue_declare(queue='shipping_queue', durable=True)
# 只关注支付成功事件
self.channel.queue_bind(
exchange='order_events',
queue='shipping_queue',
routing_key='order.paid'
)
def start(self):
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(
queue='shipping_queue',
on_message_callback=self.handle_event,
auto_ack=False
)
print(' [*] Shipping service started')
self.channel.start_consuming()
def handle_event(self, ch, method, properties, body):
message = json.loads(body)
data = message['data']
self.create_shipment(data)
ch.basic_ack(delivery_tag=method.delivery_tag)
def create_shipment(self, order):
print(f" [Shipping] Creating shipment for order {order['order_id']}")
service = ShippingService()
service.start()
通知服务订阅者(订阅多个事件)¶
class NotificationService:
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
self.channel = self.connection.channel()
self.channel.exchange_declare(
exchange='order_events',
exchange_type='direct'
)
self.channel.queue_declare(queue='notification_queue', durable=True)
# 订阅多个事件
events = [
'order.created',
'order.paid',
'order.shipped',
'order.delivered',
'order.cancelled'
]
for event in events:
self.channel.queue_bind(
exchange='order_events',
queue='notification_queue',
routing_key=event
)
def start(self):
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(
queue='notification_queue',
on_message_callback=self.handle_event,
auto_ack=False
)
print(' [*] Notification service started')
self.channel.start_consuming()
def handle_event(self, ch, method, properties, body):
message = json.loads(body)
event = message['event']
data = message['data']
self.send_notification(event, data)
ch.basic_ack(delivery_tag=method.delivery_tag)
def send_notification(self, event, data):
templates = {
'order.created': '您的订单已创建,订单号:{order_id}',
'order.paid': '您的订单已支付成功,订单号:{order_id}',
'order.shipped': '您的订单已发货,快递单号:{tracking_number}',
'order.delivered': '您的订单已签收,感谢购买!',
'order.cancelled': '您的订单已取消,订单号:{order_id}'
}
template = templates.get(event, '订单状态更新')
content = template.format(**data)
print(f" [Notification] {content}")
service = NotificationService()
service.start()
6.6 与 Fanout 的对比¶
| 特性 | Direct | Fanout |
|---|---|---|
| 路由方式 | Routing Key 精确匹配 | 忽略 Routing Key |
| 灵活性 | 可选择性接收消息 | 接收所有消息 |
| 队列绑定 | 需要指定 Routing Key | 无需指定 |
| 适用场景 | 分类处理、事件路由 | 广播、日志收集 |
6.7 小结¶
本章介绍了路由模式:
- Direct Exchange 的使用
- Routing Key 的精确匹配
- 多重绑定的应用
- 日志系统示例
- 订单事件处理实战
下一章将介绍主题模式。