跳转至

第七章:主题模式

7.1 简介

主题模式(Topics)使用 Topic Exchange,通过通配符匹配 Routing Key,实现更灵活的消息路由。

                    ┌──────────┐
Producer ──────────▶  topic   │─── *.orange.* ───▶ Queue A
                    │ exchange │─── *.*.rabbit ───▶ Queue B
                    │          │─── lazy.# ───────▶ Queue C
                    └──────────┘

特点: - Routing Key 使用点号分隔的单词 - 支持通配符:* 匹配一个单词,# 匹配零个或多个单词 - 最灵活的路由方式

7.2 Topic Exchange

Routing Key 规则

Routing Key 格式:<单词>.<单词>.<单词>...

示例:
- order.created.electronics
- order.paid.books
- user.profile.updated
- system.error.database

通配符规则

通配符 含义 示例
* 匹配一个单词 *.orange.* 匹配 quick.orange.rabbit
# 匹配零个或多个单词 lazy.# 匹配 lazylazy.orangelazy.orange.rabbit

生产者

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明 topic 交换机
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'

channel.basic_publish(
    exchange='topic_logs',
    routing_key=routing_key,
    body=message
)

print(f" [x] Sent [{routing_key}]: {message}")
connection.close()

消费者

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s <binding_key>...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(
        exchange='topic_logs',
        queue=queue_name,
        routing_key=binding_key
    )

print(f' [*] Waiting for logs. Bound to: {binding_keys}')

def callback(ch, method, properties, body):
    print(f" [x] {method.routing_key}:{body}")

channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

运行示例

# 订阅所有日志
python consumer.py "#"

# 订阅所有 error 级别
python consumer.py "*.error"

# 订阅 order 相关的所有消息
python consumer.py "order.*"

# 订阅特定模块的错误
python consumer.py "payment.error.*"

# 发送消息
python producer.py "order.created" "New order created"
python producer.py "payment.error.timeout" "Payment timeout"
python producer.py "user.profile.updated" "Profile updated"

7.3 通配符详解

* 通配符

# 绑定模式:*.orange.*
# 匹配:
#   quick.orange.rabbit ✓
#   lazy.orange.elephant ✓
#   orange.orange.orange ✓
# 不匹配:
#   orange ✗ (只有1个单词)
#   quick.orange.rabbit.elephant ✗ (4个单词)
#   quick.brown.rabbit ✗ (中间不是 orange)

channel.queue_bind(
    exchange='topic_logs',
    queue=queue_name,
    routing_key='*.orange.*'
)

# 通配符

# 绑定模式:lazy.#
# 匹配:
#   lazy ✓
#   lazy.orange ✓
#   lazy.orange.rabbit ✓
#   lazy.orange.rabbit.elephant ✓
# 不匹配:
#   quick.lazy.rabbit ✗ (lazy 不在开头)
#   Lazy.orange ✗ (大小写敏感)

channel.queue_bind(
    exchange='topic_logs',
    queue=queue_name,
    routing_key='lazy.#'
)

组合使用

# 匹配所有以 user 开头,以 updated 结尾的消息
# user.profile.updated ✓
# user.settings.privacy.updated ✓
channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key='user.*.updated')

# 匹配所有以 error 结尾的消息
# system.error ✓
# payment.gateway.error ✓
# order.processing.error ✓
channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key='#.error')

# 匹配所有消息
channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key='#')

7.4 实战:微服务事件总线

事件定义

# 事件命名规范:<服务>.<实体>.<动作>
# 示例:
#   user.account.created
#   user.account.deleted
#   order.payment.completed
#   order.shipment.started
#   inventory.stock.low
#   notification.email.sent

from enum import Enum

class ServiceEvent:
    # 用户服务事件
    USER_CREATED = 'user.account.created'
    USER_UPDATED = 'user.account.updated'
    USER_DELETED = 'user.account.deleted'

    # 订单服务事件
    ORDER_CREATED = 'order.entity.created'
    ORDER_PAID = 'order.payment.completed'
    ORDER_SHIPPED = 'order.shipment.started'
    ORDER_DELIVERED = 'order.shipment.completed'
    ORDER_CANCELLED = 'order.entity.cancelled'

    # 库存服务事件
    STOCK_RESERVED = 'inventory.stock.reserved'
    STOCK_RELEASED = 'inventory.stock.released'
    STOCK_LOW = 'inventory.stock.low'

    # 通知服务事件
    EMAIL_SENT = 'notification.email.sent'
    SMS_SENT = 'notification.sms.sent'

事件发布者

import pika
import json
from datetime import datetime

class EventBus:
    def __init__(self, host='localhost', exchange='events'):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host)
        )
        self.channel = self.connection.channel()
        self.exchange = exchange

        self.channel.exchange_declare(
            exchange=exchange,
            exchange_type='topic',
            durable=True
        )

    def publish(self, routing_key: str, data: dict, correlation_id=None):
        """发布事件"""
        event = {
            'data': data,
            'metadata': {
                'timestamp': datetime.now().isoformat(),
                'correlation_id': correlation_id
            }
        }

        self.channel.basic_publish(
            exchange=self.exchange,
            routing_key=routing_key,
            body=json.dumps(event),
            properties=pika.BasicProperties(
                delivery_mode=2,
                content_type='application/json'
            )
        )

        print(f" [x] Event published: {routing_key}")

    def close(self):
        self.connection.close()

# 使用
bus = EventBus()

# 用户注册
bus.publish('user.account.created', {
    'user_id': 1,
    'username': 'alice',
    'email': 'alice@example.com'
})

# 订单支付
bus.publish('order.payment.completed', {
    'order_id': 1001,
    'amount': 199.99,
    'payment_method': 'alipay'
})

bus.close()

事件订阅者基类

import pika
import json
import logging
from abc import ABC, abstractmethod

class EventSubscriber(ABC):
    def __init__(self, host='localhost', exchange='events', service_name='subscriber'):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host)
        )
        self.channel = self.connection.channel()
        self.exchange = exchange
        self.service_name = service_name

        self.channel.exchange_declare(
            exchange=exchange,
            exchange_type='topic',
            durable=True
        )

        # 创建服务专属队列
        self.queue_name = f"{service_name}_queue"
        self.channel.queue_declare(
            queue=self.queue_name,
            durable=True
        )

    def subscribe(self, pattern: str):
        """订阅事件"""
        self.channel.queue_bind(
            exchange=self.exchange,
            queue=self.queue_name,
            routing_key=pattern
        )
        print(f" [*] Subscribed to: {pattern}")

    def start(self):
        """开始消费"""
        self.channel.basic_qos(prefetch_count=1)
        self.channel.basic_consume(
            queue=self.queue_name,
            on_message_callback=self._handle_event,
            auto_ack=False
        )

        print(f" [*] {self.service_name} started")
        self.channel.start_consuming()

    def _handle_event(self, ch, method, properties, body):
        try:
            event = json.loads(body)
            self.handle_event(method.routing_key, event)
            ch.basic_ack(delivery_tag=method.delivery_tag)
        except Exception as e:
            logging.error(f"Error handling event: {e}")
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

    @abstractmethod
    def handle_event(self, routing_key: str, event: dict):
        """子类实现事件处理"""
        pass

用户服务订阅者

class UserServiceSubscriber(EventSubscriber):
    def __init__(self):
        super().__init__(service_name='user_service')

        # 订阅用户相关事件
        self.subscribe('user.account.*')

        # 订阅订单事件(用于更新用户统计)
        self.subscribe('order.payment.completed')

    def handle_event(self, routing_key: str, event: dict):
        if routing_key == 'user.account.created':
            self.on_user_created(event['data'])
        elif routing_key == 'user.account.deleted':
            self.on_user_deleted(event['data'])
        elif routing_key == 'order.payment.completed':
            self.on_order_paid(event['data'])

    def on_user_created(self, data):
        print(f" [User] New user created: {data['username']}")
        # 发送欢迎邮件等

    def on_user_deleted(self, data):
        print(f" [User] User deleted: {data['user_id']}")
        # 清理用户数据等

    def on_order_paid(self, data):
        print(f" [User] Order paid by user: {data.get('user_id')}")

subscriber = UserServiceSubscriber()
subscriber.start()

库存服务订阅者

class InventoryServiceSubscriber(EventSubscriber):
    def __init__(self):
        super().__init__(service_name='inventory_service')

        # 订阅订单和库存事件
        self.subscribe('order.entity.created')
        self.subscribe('order.entity.cancelled')
        self.subscribe('inventory.stock.*')

    def handle_event(self, routing_key: str, event: dict):
        if routing_key == 'order.entity.created':
            self.reserve_stock(event['data'])
        elif routing_key == 'order.entity.cancelled':
            self.release_stock(event['data'])
        elif routing_key == 'inventory.stock.low':
            self.alert_low_stock(event['data'])

    def reserve_stock(self, order):
        print(f" [Inventory] Reserving stock for order {order['order_id']}")

    def release_stock(self, order):
        print(f" [Inventory] Releasing stock for order {order['order_id']}")

    def alert_low_stock(self, data):
        print(f" [Inventory] Low stock alert: {data['product_id']}")

subscriber = InventoryServiceSubscriber()
subscriber.start()

通知服务订阅者

class NotificationServiceSubscriber(EventSubscriber):
    def __init__(self):
        super().__init__(service_name='notification_service')

        # 订阅所有需要发送通知的事件
        self.subscribe('user.account.created')
        self.subscribe('order.payment.completed')
        self.subscribe('order.shipment.started')
        self.subscribe('order.shipment.completed')

    def handle_event(self, routing_key: str, event: dict):
        notification = self.get_notification(routing_key, event['data'])
        self.send_notification(notification)

    def get_notification(self, routing_key: str, data: dict):
        templates = {
            'user.account.created': {
                'type': 'email',
                'subject': '欢迎注册',
                'template': 'welcome'
            },
            'order.payment.completed': {
                'type': 'sms',
                'template': 'payment_success'
            },
            'order.shipment.started': {
                'type': 'sms',
                'template': 'order_shipped'
            },
            'order.shipment.completed': {
                'type': 'email',
                'subject': '订单已签收',
                'template': 'order_delivered'
            }
        }

        config = templates.get(routing_key, {})
        return {
            **config,
            'data': data
        }

    def send_notification(self, notification):
        print(f" [Notification] Sending: {notification}")

subscriber = NotificationServiceSubscriber()
subscriber.start()

7.5 监控服务(订阅所有事件)

class MonitoringService(EventSubscriber):
    def __init__(self):
        super().__init__(service_name='monitoring')

        # 订阅所有事件
        self.subscribe('#')

    def handle_event(self, routing_key: str, event: dict):
        # 记录所有事件到日志或数据库
        print(f" [Monitor] {routing_key}: {event['data']}")

        # 可以添加告警逻辑
        if 'error' in routing_key.lower():
            self.send_alert(routing_key, event)

    def send_alert(self, routing_key, event):
        print(f" [Alert] Error detected: {routing_key}")

monitor = MonitoringService()
monitor.start()

7.6 与 Direct 的对比

特性 Topic Direct
路由方式 通配符匹配 精确匹配
灵活性 最高 中等
复杂度 较高 较低
性能 略低(需要模式匹配) 较高
适用场景 复杂事件路由、微服务 简单分类、固定路由

7.7 最佳实践

Routing Key 命名规范

# 推荐:<服务>.<实体>.<动作>
user.account.created
order.payment.completed
inventory.stock.low

# 避免:
created.user.account  # 顺序混乱
userAccountCreated    # 没有点分隔
user.account.is.created.now  # 过长

绑定模式设计

# 好的设计:明确、可预测
self.subscribe('user.account.*')      # 所有用户账户事件
self.subscribe('order.payment.*')     # 所有订单支付事件
self.subscribe('#.error')             # 所有错误事件

# 避免:过于宽泛
self.subscribe('*.*.*')               # 只匹配3个单词
self.subscribe('#')                   # 匹配所有,可能接收过多消息

性能考虑

# 避免过于复杂的模式
# 慢:需要大量匹配
self.subscribe('*.*.*.*.*.error')

# 快:更精确的模式
self.subscribe('payment.error.#')
self.subscribe('order.error.#')

7.8 小结

本章介绍了主题模式:

  1. Topic Exchange 的使用
  2. 通配符 *# 的区别
  3. 微服务事件总线实战
  4. Routing Key 命名规范
  5. 与 Direct 的对比

下一章将介绍 RPC 模式。