第七章:主题模式¶
7.1 简介¶
主题模式(Topics)使用 Topic Exchange,通过通配符匹配 Routing Key,实现更灵活的消息路由。
┌──────────┐
Producer ──────────▶ topic │─── *.orange.* ───▶ Queue A
│ exchange │─── *.*.rabbit ───▶ Queue B
│ │─── lazy.# ───────▶ Queue C
└──────────┘
特点:
- Routing Key 使用点号分隔的单词
- 支持通配符:* 匹配一个单词,# 匹配零个或多个单词
- 最灵活的路由方式
7.2 Topic Exchange¶
Routing Key 规则¶
Routing Key 格式:<单词>.<单词>.<单词>...
示例:
- order.created.electronics
- order.paid.books
- user.profile.updated
- system.error.database
通配符规则¶
| 通配符 | 含义 | 示例 |
|---|---|---|
* |
匹配一个单词 | *.orange.* 匹配 quick.orange.rabbit |
# |
匹配零个或多个单词 | lazy.# 匹配 lazy、lazy.orange、lazy.orange.rabbit |
生产者¶
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明 topic 交换机
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
exchange='topic_logs',
routing_key=routing_key,
body=message
)
print(f" [x] Sent [{routing_key}]: {message}")
connection.close()
消费者¶
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s <binding_key>...\n" % sys.argv[0])
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(
exchange='topic_logs',
queue=queue_name,
routing_key=binding_key
)
print(f' [*] Waiting for logs. Bound to: {binding_keys}')
def callback(ch, method, properties, body):
print(f" [x] {method.routing_key}:{body}")
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
运行示例¶
# 订阅所有日志
python consumer.py "#"
# 订阅所有 error 级别
python consumer.py "*.error"
# 订阅 order 相关的所有消息
python consumer.py "order.*"
# 订阅特定模块的错误
python consumer.py "payment.error.*"
# 发送消息
python producer.py "order.created" "New order created"
python producer.py "payment.error.timeout" "Payment timeout"
python producer.py "user.profile.updated" "Profile updated"
7.3 通配符详解¶
* 通配符¶
# 绑定模式:*.orange.*
# 匹配:
# quick.orange.rabbit ✓
# lazy.orange.elephant ✓
# orange.orange.orange ✓
# 不匹配:
# orange ✗ (只有1个单词)
# quick.orange.rabbit.elephant ✗ (4个单词)
# quick.brown.rabbit ✗ (中间不是 orange)
channel.queue_bind(
exchange='topic_logs',
queue=queue_name,
routing_key='*.orange.*'
)
# 通配符¶
# 绑定模式:lazy.#
# 匹配:
# lazy ✓
# lazy.orange ✓
# lazy.orange.rabbit ✓
# lazy.orange.rabbit.elephant ✓
# 不匹配:
# quick.lazy.rabbit ✗ (lazy 不在开头)
# Lazy.orange ✗ (大小写敏感)
channel.queue_bind(
exchange='topic_logs',
queue=queue_name,
routing_key='lazy.#'
)
组合使用¶
# 匹配所有以 user 开头,以 updated 结尾的消息
# user.profile.updated ✓
# user.settings.privacy.updated ✓
channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key='user.*.updated')
# 匹配所有以 error 结尾的消息
# system.error ✓
# payment.gateway.error ✓
# order.processing.error ✓
channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key='#.error')
# 匹配所有消息
channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key='#')
7.4 实战:微服务事件总线¶
事件定义¶
# 事件命名规范:<服务>.<实体>.<动作>
# 示例:
# user.account.created
# user.account.deleted
# order.payment.completed
# order.shipment.started
# inventory.stock.low
# notification.email.sent
from enum import Enum
class ServiceEvent:
# 用户服务事件
USER_CREATED = 'user.account.created'
USER_UPDATED = 'user.account.updated'
USER_DELETED = 'user.account.deleted'
# 订单服务事件
ORDER_CREATED = 'order.entity.created'
ORDER_PAID = 'order.payment.completed'
ORDER_SHIPPED = 'order.shipment.started'
ORDER_DELIVERED = 'order.shipment.completed'
ORDER_CANCELLED = 'order.entity.cancelled'
# 库存服务事件
STOCK_RESERVED = 'inventory.stock.reserved'
STOCK_RELEASED = 'inventory.stock.released'
STOCK_LOW = 'inventory.stock.low'
# 通知服务事件
EMAIL_SENT = 'notification.email.sent'
SMS_SENT = 'notification.sms.sent'
事件发布者¶
import pika
import json
from datetime import datetime
class EventBus:
def __init__(self, host='localhost', exchange='events'):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host)
)
self.channel = self.connection.channel()
self.exchange = exchange
self.channel.exchange_declare(
exchange=exchange,
exchange_type='topic',
durable=True
)
def publish(self, routing_key: str, data: dict, correlation_id=None):
"""发布事件"""
event = {
'data': data,
'metadata': {
'timestamp': datetime.now().isoformat(),
'correlation_id': correlation_id
}
}
self.channel.basic_publish(
exchange=self.exchange,
routing_key=routing_key,
body=json.dumps(event),
properties=pika.BasicProperties(
delivery_mode=2,
content_type='application/json'
)
)
print(f" [x] Event published: {routing_key}")
def close(self):
self.connection.close()
# 使用
bus = EventBus()
# 用户注册
bus.publish('user.account.created', {
'user_id': 1,
'username': 'alice',
'email': 'alice@example.com'
})
# 订单支付
bus.publish('order.payment.completed', {
'order_id': 1001,
'amount': 199.99,
'payment_method': 'alipay'
})
bus.close()
事件订阅者基类¶
import pika
import json
import logging
from abc import ABC, abstractmethod
class EventSubscriber(ABC):
def __init__(self, host='localhost', exchange='events', service_name='subscriber'):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host)
)
self.channel = self.connection.channel()
self.exchange = exchange
self.service_name = service_name
self.channel.exchange_declare(
exchange=exchange,
exchange_type='topic',
durable=True
)
# 创建服务专属队列
self.queue_name = f"{service_name}_queue"
self.channel.queue_declare(
queue=self.queue_name,
durable=True
)
def subscribe(self, pattern: str):
"""订阅事件"""
self.channel.queue_bind(
exchange=self.exchange,
queue=self.queue_name,
routing_key=pattern
)
print(f" [*] Subscribed to: {pattern}")
def start(self):
"""开始消费"""
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(
queue=self.queue_name,
on_message_callback=self._handle_event,
auto_ack=False
)
print(f" [*] {self.service_name} started")
self.channel.start_consuming()
def _handle_event(self, ch, method, properties, body):
try:
event = json.loads(body)
self.handle_event(method.routing_key, event)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
logging.error(f"Error handling event: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
@abstractmethod
def handle_event(self, routing_key: str, event: dict):
"""子类实现事件处理"""
pass
用户服务订阅者¶
class UserServiceSubscriber(EventSubscriber):
def __init__(self):
super().__init__(service_name='user_service')
# 订阅用户相关事件
self.subscribe('user.account.*')
# 订阅订单事件(用于更新用户统计)
self.subscribe('order.payment.completed')
def handle_event(self, routing_key: str, event: dict):
if routing_key == 'user.account.created':
self.on_user_created(event['data'])
elif routing_key == 'user.account.deleted':
self.on_user_deleted(event['data'])
elif routing_key == 'order.payment.completed':
self.on_order_paid(event['data'])
def on_user_created(self, data):
print(f" [User] New user created: {data['username']}")
# 发送欢迎邮件等
def on_user_deleted(self, data):
print(f" [User] User deleted: {data['user_id']}")
# 清理用户数据等
def on_order_paid(self, data):
print(f" [User] Order paid by user: {data.get('user_id')}")
subscriber = UserServiceSubscriber()
subscriber.start()
库存服务订阅者¶
class InventoryServiceSubscriber(EventSubscriber):
def __init__(self):
super().__init__(service_name='inventory_service')
# 订阅订单和库存事件
self.subscribe('order.entity.created')
self.subscribe('order.entity.cancelled')
self.subscribe('inventory.stock.*')
def handle_event(self, routing_key: str, event: dict):
if routing_key == 'order.entity.created':
self.reserve_stock(event['data'])
elif routing_key == 'order.entity.cancelled':
self.release_stock(event['data'])
elif routing_key == 'inventory.stock.low':
self.alert_low_stock(event['data'])
def reserve_stock(self, order):
print(f" [Inventory] Reserving stock for order {order['order_id']}")
def release_stock(self, order):
print(f" [Inventory] Releasing stock for order {order['order_id']}")
def alert_low_stock(self, data):
print(f" [Inventory] Low stock alert: {data['product_id']}")
subscriber = InventoryServiceSubscriber()
subscriber.start()
通知服务订阅者¶
class NotificationServiceSubscriber(EventSubscriber):
def __init__(self):
super().__init__(service_name='notification_service')
# 订阅所有需要发送通知的事件
self.subscribe('user.account.created')
self.subscribe('order.payment.completed')
self.subscribe('order.shipment.started')
self.subscribe('order.shipment.completed')
def handle_event(self, routing_key: str, event: dict):
notification = self.get_notification(routing_key, event['data'])
self.send_notification(notification)
def get_notification(self, routing_key: str, data: dict):
templates = {
'user.account.created': {
'type': 'email',
'subject': '欢迎注册',
'template': 'welcome'
},
'order.payment.completed': {
'type': 'sms',
'template': 'payment_success'
},
'order.shipment.started': {
'type': 'sms',
'template': 'order_shipped'
},
'order.shipment.completed': {
'type': 'email',
'subject': '订单已签收',
'template': 'order_delivered'
}
}
config = templates.get(routing_key, {})
return {
**config,
'data': data
}
def send_notification(self, notification):
print(f" [Notification] Sending: {notification}")
subscriber = NotificationServiceSubscriber()
subscriber.start()
7.5 监控服务(订阅所有事件)¶
class MonitoringService(EventSubscriber):
def __init__(self):
super().__init__(service_name='monitoring')
# 订阅所有事件
self.subscribe('#')
def handle_event(self, routing_key: str, event: dict):
# 记录所有事件到日志或数据库
print(f" [Monitor] {routing_key}: {event['data']}")
# 可以添加告警逻辑
if 'error' in routing_key.lower():
self.send_alert(routing_key, event)
def send_alert(self, routing_key, event):
print(f" [Alert] Error detected: {routing_key}")
monitor = MonitoringService()
monitor.start()
7.6 与 Direct 的对比¶
| 特性 | Topic | Direct |
|---|---|---|
| 路由方式 | 通配符匹配 | 精确匹配 |
| 灵活性 | 最高 | 中等 |
| 复杂度 | 较高 | 较低 |
| 性能 | 略低(需要模式匹配) | 较高 |
| 适用场景 | 复杂事件路由、微服务 | 简单分类、固定路由 |
7.7 最佳实践¶
Routing Key 命名规范¶
# 推荐:<服务>.<实体>.<动作>
user.account.created
order.payment.completed
inventory.stock.low
# 避免:
created.user.account # 顺序混乱
userAccountCreated # 没有点分隔
user.account.is.created.now # 过长
绑定模式设计¶
# 好的设计:明确、可预测
self.subscribe('user.account.*') # 所有用户账户事件
self.subscribe('order.payment.*') # 所有订单支付事件
self.subscribe('#.error') # 所有错误事件
# 避免:过于宽泛
self.subscribe('*.*.*') # 只匹配3个单词
self.subscribe('#') # 匹配所有,可能接收过多消息
性能考虑¶
# 避免过于复杂的模式
# 慢:需要大量匹配
self.subscribe('*.*.*.*.*.error')
# 快:更精确的模式
self.subscribe('payment.error.#')
self.subscribe('order.error.#')
7.8 小结¶
本章介绍了主题模式:
- Topic Exchange 的使用
- 通配符
*和#的区别 - 微服务事件总线实战
- Routing Key 命名规范
- 与 Direct 的对比
下一章将介绍 RPC 模式。