跳转至

第六章:路由模式

6.1 简介

路由模式(Routing)使用 Direct Exchange 根据 Routing Key 将消息路由到特定的队列。

                    ┌──────────┐
Producer ──────────▶  direct  │─── error ───▶ Queue Error ───▶ Consumer Error
                    │ exchange │─── warn  ───▶ Queue All
                    │          │─── info  ───▶ Queue All
                    └──────────┘

特点: - 消息根据 Routing Key 精确匹配 - 一个队列可以绑定多个 Routing Key - 比 Fanout 更灵活,比 Topic 更精确

6.2 Direct Exchange

Direct 交换机将消息路由到 Routing Key 完全匹配的队列。

生产者

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明 direct 交换机
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'

# 使用 severity 作为 routing key
channel.basic_publish(
    exchange='direct_logs',
    routing_key=severity,
    body=message
)

print(f" [x] Sent [{severity}]: {message}")
connection.close()

消费者

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

# 绑定感兴趣的 severity
severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(
        exchange='direct_logs',
        queue=queue_name,
        routing_key=severity
    )

print(f' [*] Waiting for logs. Bound to: {severities}')

def callback(ch, method, properties, body):
    print(f" [x] {method.routing_key}:{body}")

channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

运行示例

# 订阅所有日志
python consumer.py info warning error

# 只订阅错误日志
python consumer.py error

# 发送不同级别的日志
python producer.py error "Run. Run. Or it will explode."
# [x] Sent [error]: Run. Run. Or it will explode.

python producer.py info "Info message"
# [x] Sent [info]: Info message

6.3 多重绑定

一个队列可以绑定多个 Routing Key:

# 绑定多个 routing key 到同一个队列
channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key='error')
channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key='warning')
channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key='info')

多个队列可以绑定相同的 Routing Key:

# 队列 A 绑定 error
channel.queue_bind(exchange='direct_logs', queue='queue_a', routing_key='error')

# 队列 B 也绑定 error
channel.queue_bind(exchange='direct_logs', queue='queue_b', routing_key='error')

# 发送 error 消息,两个队列都会收到
channel.basic_publish(exchange='direct_logs', routing_key='error', body='Error!')

6.4 日志系统示例

完整的日志系统

import pika
import json
from datetime import datetime
from enum import Enum

class LogLevel(Enum):
    DEBUG = 'debug'
    INFO = 'info'
    WARNING = 'warning'
    ERROR = 'error'
    CRITICAL = 'critical'

class LogPublisher:
    def __init__(self, host='localhost'):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host)
        )
        self.channel = self.connection.channel()

        self.channel.exchange_declare(
            exchange='logs',
            exchange_type='direct'
        )

    def log(self, level: LogLevel, message: str, **kwargs):
        """发送日志"""
        log_entry = {
            'level': level.value,
            'message': message,
            'timestamp': datetime.now().isoformat(),
            'extra': kwargs
        }

        self.channel.basic_publish(
            exchange='logs',
            routing_key=level.value,
            body=json.dumps(log_entry)
        )

        print(f" [x] Logged [{level.value}]: {message}")

    def close(self):
        self.connection.close()

# 使用
publisher = LogPublisher()
publisher.log(LogLevel.DEBUG, "Debug message", user_id=1)
publisher.log(LogLevel.INFO, "User logged in", user_id=1)
publisher.log(LogLevel.WARNING, "High memory usage", memory_percent=85)
publisher.log(LogLevel.ERROR, "Database connection failed", error_code=500)
publisher.log(LogLevel.CRITICAL, "System shutdown", reason="maintenance")
publisher.close()

日志收集器(收集所有日志)

import pika
import json
from datetime import datetime

class LogCollector:
    def __init__(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters('localhost')
        )
        self.channel = self.connection.channel()

        self.channel.exchange_declare(
            exchange='logs',
            exchange_type='direct'
        )

        # 创建持久化队列
        self.channel.queue_declare(queue='all_logs', durable=True)

        # 绑定所有日志级别
        for level in ['debug', 'info', 'warning', 'error', 'critical']:
            self.channel.queue_bind(
                exchange='logs',
                queue='all_logs',
                routing_key=level
            )

    def start(self):
        self.channel.basic_consume(
            queue='all_logs',
            on_message_callback=self.process_log,
            auto_ack=False
        )

        print(' [*] Log collector started')
        self.channel.start_consuming()

    def process_log(self, ch, method, properties, body):
        log = json.loads(body)

        # 存储到数据库或文件
        self.save_log(log)

        print(f" [x] Collected [{log['level']}]: {log['message']}")
        ch.basic_ack(delivery_tag=method.delivery_tag)

    def save_log(self, log):
        with open('logs.jsonl', 'a') as f:
            f.write(json.dumps(log) + '\n')

collector = LogCollector()
collector.start()

错误处理器(只处理错误日志)

class ErrorHandler:
    def __init__(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters('localhost')
        )
        self.channel = self.connection.channel()

        self.channel.exchange_declare(
            exchange='logs',
            exchange_type='direct'
        )

        # 创建错误专用队列
        self.channel.queue_declare(queue='error_queue', durable=True)

        # 只绑定 error 和 critical
        for level in ['error', 'critical']:
            self.channel.queue_bind(
                exchange='logs',
                queue='error_queue',
                routing_key=level
            )

    def start(self):
        self.channel.basic_qos(prefetch_count=1)
        self.channel.basic_consume(
            queue='error_queue',
            on_message_callback=self.handle_error,
            auto_ack=False
        )

        print(' [*] Error handler started')
        self.channel.start_consuming()

    def handle_error(self, ch, method, properties, body):
        log = json.loads(body)

        # 发送告警
        self.send_alert(log)

        print(f" [!] Error: {log['message']}")
        ch.basic_ack(delivery_tag=method.delivery_tag)

    def send_alert(self, log):
        # 发送邮件或短信告警
        print(f" [Alert] {log['level'].upper()}: {log['message']}")

handler = ErrorHandler()
handler.start()

6.5 实战:订单事件处理

订单事件发布者

import pika
import json
from datetime import datetime
from enum import Enum

class OrderEvent(Enum):
    CREATED = 'order.created'
    PAID = 'order.paid'
    SHIPPED = 'order.shipped'
    DELIVERED = 'order.delivered'
    CANCELLED = 'order.cancelled'
    REFUNDED = 'order.refunded'

class OrderEventPublisher:
    def __init__(self, host='localhost'):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host)
        )
        self.channel = self.connection.channel()

        self.channel.exchange_declare(
            exchange='order_events',
            exchange_type='direct'
        )

    def publish(self, event: OrderEvent, order_data: dict):
        """发布订单事件"""
        message = {
            'event': event.value,
            'data': order_data,
            'timestamp': datetime.now().isoformat()
        }

        self.channel.basic_publish(
            exchange='order_events',
            routing_key=event.value,
            body=json.dumps(message),
            properties=pika.BasicProperties(
                delivery_mode=2,  # 持久化
            )
        )

        print(f" [x] Event published: {event.value}")

    def close(self):
        self.connection.close()

# 使用
publisher = OrderEventPublisher()

# 创建订单
publisher.publish(OrderEvent.CREATED, {
    'order_id': 1001,
    'user_id': 1,
    'amount': 199.99,
    'items': [{'product_id': 1, 'quantity': 2}]
})

# 支付成功
publisher.publish(OrderEvent.PAID, {
    'order_id': 1001,
    'payment_method': 'alipay',
    'transaction_id': 'TXN123456'
})

# 发货
publisher.publish(OrderEvent.SHIPPED, {
    'order_id': 1001,
    'tracking_number': 'SF1234567890',
    'carrier': '顺丰'
})

publisher.close()

库存服务订阅者

class InventoryService:
    def __init__(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters('localhost')
        )
        self.channel = self.connection.channel()

        self.channel.exchange_declare(
            exchange='order_events',
            exchange_type='direct'
        )

        self.channel.queue_declare(queue='inventory_queue', durable=True)

        # 只关注订单创建和取消事件
        for event in ['order.created', 'order.cancelled']:
            self.channel.queue_bind(
                exchange='order_events',
                queue='inventory_queue',
                routing_key=event
            )

    def start(self):
        self.channel.basic_qos(prefetch_count=1)
        self.channel.basic_consume(
            queue='inventory_queue',
            on_message_callback=self.handle_event,
            auto_ack=False
        )

        print(' [*] Inventory service started')
        self.channel.start_consuming()

    def handle_event(self, ch, method, properties, body):
        message = json.loads(body)
        event = message['event']
        data = message['data']

        if event == 'order.created':
            self.reserve_inventory(data)
        elif event == 'order.cancelled':
            self.release_inventory(data)

        ch.basic_ack(delivery_tag=method.delivery_tag)

    def reserve_inventory(self, order):
        print(f" [Inventory] Reserving items for order {order['order_id']}")
        for item in order['items']:
            print(f"   - Product {item['product_id']}: {item['quantity']}")

    def release_inventory(self, order):
        print(f" [Inventory] Releasing items for order {order['order_id']}")

service = InventoryService()
service.start()

物流服务订阅者

class ShippingService:
    def __init__(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters('localhost')
        )
        self.channel = self.connection.channel()

        self.channel.exchange_declare(
            exchange='order_events',
            exchange_type='direct'
        )

        self.channel.queue_declare(queue='shipping_queue', durable=True)

        # 只关注支付成功事件
        self.channel.queue_bind(
            exchange='order_events',
            queue='shipping_queue',
            routing_key='order.paid'
        )

    def start(self):
        self.channel.basic_qos(prefetch_count=1)
        self.channel.basic_consume(
            queue='shipping_queue',
            on_message_callback=self.handle_event,
            auto_ack=False
        )

        print(' [*] Shipping service started')
        self.channel.start_consuming()

    def handle_event(self, ch, method, properties, body):
        message = json.loads(body)
        data = message['data']

        self.create_shipment(data)

        ch.basic_ack(delivery_tag=method.delivery_tag)

    def create_shipment(self, order):
        print(f" [Shipping] Creating shipment for order {order['order_id']}")

service = ShippingService()
service.start()

通知服务订阅者(订阅多个事件)

class NotificationService:
    def __init__(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters('localhost')
        )
        self.channel = self.connection.channel()

        self.channel.exchange_declare(
            exchange='order_events',
            exchange_type='direct'
        )

        self.channel.queue_declare(queue='notification_queue', durable=True)

        # 订阅多个事件
        events = [
            'order.created',
            'order.paid',
            'order.shipped',
            'order.delivered',
            'order.cancelled'
        ]

        for event in events:
            self.channel.queue_bind(
                exchange='order_events',
                queue='notification_queue',
                routing_key=event
            )

    def start(self):
        self.channel.basic_qos(prefetch_count=1)
        self.channel.basic_consume(
            queue='notification_queue',
            on_message_callback=self.handle_event,
            auto_ack=False
        )

        print(' [*] Notification service started')
        self.channel.start_consuming()

    def handle_event(self, ch, method, properties, body):
        message = json.loads(body)
        event = message['event']
        data = message['data']

        self.send_notification(event, data)

        ch.basic_ack(delivery_tag=method.delivery_tag)

    def send_notification(self, event, data):
        templates = {
            'order.created': '您的订单已创建,订单号:{order_id}',
            'order.paid': '您的订单已支付成功,订单号:{order_id}',
            'order.shipped': '您的订单已发货,快递单号:{tracking_number}',
            'order.delivered': '您的订单已签收,感谢购买!',
            'order.cancelled': '您的订单已取消,订单号:{order_id}'
        }

        template = templates.get(event, '订单状态更新')
        content = template.format(**data)

        print(f" [Notification] {content}")

service = NotificationService()
service.start()

6.6 与 Fanout 的对比

特性 Direct Fanout
路由方式 Routing Key 精确匹配 忽略 Routing Key
灵活性 可选择性接收消息 接收所有消息
队列绑定 需要指定 Routing Key 无需指定
适用场景 分类处理、事件路由 广播、日志收集

6.7 小结

本章介绍了路由模式:

  1. Direct Exchange 的使用
  2. Routing Key 的精确匹配
  3. 多重绑定的应用
  4. 日志系统示例
  5. 订单事件处理实战

下一章将介绍主题模式。