跳转至

第三章:核心概念

3.1 消息模型

RabbitMQ 的消息模型基于 AMQP 协议,核心概念包括:

Producer → Exchange → Queue → Consumer
              └── Binding(绑定规则)

消息流转过程

  1. 生产者**创建消息并发送到 **交换机
  2. 交换机**根据路由规则将消息路由到一个或多个 **队列
  3. **队列**存储消息,等待消费者获取
  4. **消费者**从队列获取消息并处理

3.2 Exchange(交换机)

交换机是消息的路由中心,接收生产者发送的消息,根据路由规则分发到队列。

交换机类型

类型 说明
direct 直接匹配,消息路由到 Routing Key 完全匹配的队列
fanout 广播,消息路由到所有绑定的队列
topic 主题匹配,支持通配符匹配 Routing Key
headers 头部匹配,根据消息头属性路由(较少使用)

Direct Exchange

直接交换机根据 Routing Key 精确匹配:

                    ┌─────────┐
Producer ──RK:info──▶  direct │───info──▶ Queue A
                    │         │
                    │         │───error──▶ Queue B
                    └─────────┘
# 声明 direct 交换机
channel.exchange_declare(exchange='logs_direct', exchange_type='direct')

# 绑定队列
channel.queue_bind(queue='info_queue', exchange='logs_direct', routing_key='info')
channel.queue_bind(queue='error_queue', exchange='logs_direct', routing_key='error')

# 发送消息
channel.basic_publish(exchange='logs_direct', routing_key='info', body='Info message')
channel.basic_publish(exchange='logs_direct', routing_key='error', body='Error message')

Fanout Exchange

扇出交换机忽略 Routing Key,将消息广播到所有绑定的队列:

                    ┌─────────┐
Producer ──────────▶  fanout │───▶ Queue A
                    │         │───▶ Queue B
                    │         │───▶ Queue C
                    └─────────┘
# 声明 fanout 交换机
channel.exchange_declare(exchange='logs_fanout', exchange_type='fanout')

# 绑定队列(不需要 routing_key)
channel.queue_bind(queue='queue_a', exchange='logs_fanout')
channel.queue_bind(queue='queue_b', exchange='logs_fanout')

# 发送消息(routing_key 被忽略)
channel.basic_publish(exchange='logs_fanout', routing_key='', body='Broadcast message')

Topic Exchange

主题交换机支持通配符匹配:

  • * 匹配一个单词
  • # 匹配零个或多个单词
                    ┌─────────┐
Producer ──RK:user.*──▶  topic │───user.*──▶ Queue A
                    │         │───user.#───▶ Queue B
                    │         │───#.error──▶ Queue C
                    └─────────┘
# 声明 topic 交换机
channel.exchange_declare(exchange='logs_topic', exchange_type='topic')

# 绑定队列
channel.queue_bind(queue='user_queue', exchange='logs_topic', routing_key='user.*')
channel.queue_bind(queue='all_user_queue', exchange='logs_topic', routing_key='user.#')
channel.queue_bind(queue='error_queue', exchange='logs_topic', routing_key='#.error')

# 发送消息
channel.basic_publish(exchange='logs_topic', routing_key='user.info', body='User info')
channel.basic_publish(exchange='logs_topic', routing_key='user.profile.error', body='Profile error')

Headers Exchange

头部交换机根据消息头属性匹配:

# 声明 headers 交换机
channel.exchange_declare(exchange='logs_headers', exchange_type='headers')

# 绑定队列
channel.queue_bind(
    queue='queue_a',
    exchange='logs_headers',
    arguments={
        'x-match': 'all',  # all: 所有头部匹配; any: 任一头部匹配
        'format': 'pdf',
        'type': 'report'
    }
)

# 发送消息
channel.basic_publish(
    exchange='logs_headers',
    routing_key='',  # 被忽略
    body='Report content',
    properties=pika.BasicProperties(
        headers={'format': 'pdf', 'type': 'report'}
    )
)

默认交换机

RabbitMQ 提供一个默认的 direct 交换机(名称为空字符串 ''),消息直接路由到 Routing Key 同名的队列:

# 使用默认交换机
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')

3.3 Queue(队列)

队列是消息的存储容器,遵循 FIFO(先进先出)原则。

队列属性

channel.queue_declare(
    queue='task_queue',
    durable=True,        # 持久化
    exclusive=False,     # 是否独占(连接关闭时自动删除)
    auto_delete=False,   # 无消费者时自动删除
    arguments={
        'x-message-ttl': 60000,        # 消息 TTL(毫秒)
        'x-expires': 86400000,         # 队列过期时间(毫秒)
        'x-max-length': 1000,          # 最大消息数
        'x-max-length-bytes': 10485760, # 最大字节数
        'x-dead-letter-exchange': 'dlx', # 死信交换机
        'x-dead-letter-routing-key': 'dead' # 死信路由键
    }
)

持久化

持久化队列在 RabbitMQ 重启后依然存在:

# 声明持久化队列
channel.queue_declare(queue='durable_queue', durable=True)

# 发送持久化消息
channel.basic_publish(
    exchange='',
    routing_key='durable_queue',
    body='Persistent message',
    properties=pika.BasicProperties(delivery_mode=2)  # 2 表示持久化
)

临时队列

临时队列在消费者断开连接后自动删除:

# 声明临时队列(自动生成队列名)
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

队列长度限制

# 最大消息数限制
channel.queue_declare(
    queue='limited_queue',
    arguments={
        'x-max-length': 1000,
        'x-overflow': 'reject-publish'  # 或 'drop-head'(默认)
    }
)

# 最大字节数限制
channel.queue_declare(
    queue='byte_limited_queue',
    arguments={
        'x-max-length-bytes': 10485760  # 10MB
    }
)

消息 TTL

# 队列级别 TTL
channel.queue_declare(
    queue='ttl_queue',
    arguments={'x-message-ttl': 60000}  # 60秒
)

# 消息级别 TTL
channel.basic_publish(
    exchange='',
    routing_key='queue',
    body='Message with TTL',
    properties=pika.BasicProperties(expiration='30000')  # 30秒
)

3.4 Binding(绑定)

绑定定义了交换机和队列之间的关系。

# 绑定队列到交换机
channel.queue_bind(
    queue='queue_name',
    exchange='exchange_name',
    routing_key='routing_key'
)

# 查看绑定
channel.queue_bind(queue='q1', exchange='e1', routing_key='key1')
channel.queue_bind(queue='q1', exchange='e1', routing_key='key2')

# 一个队列可以绑定多个 routing_key
# 一个队列可以绑定多个交换机

3.5 Message(消息)

消息由两部分组成:消息体和消息属性。

消息属性

channel.basic_publish(
    exchange='',
    routing_key='queue',
    body='Message body',
    properties=pika.BasicProperties(
        content_type='application/json',  # 内容类型
        content_encoding='utf-8',         # 编码
        headers={'key': 'value'},         # 自定义头部
        delivery_mode=2,                  # 持久化
        priority=0,                       # 优先级(0-9)
        correlation_id='corr-123',        # 关联 ID(RPC 用)
        reply_to='reply_queue',           # 回复队列(RPC 用)
        expiration='60000',               # TTL(毫秒)
        message_id='msg-123',             # 消息 ID
        timestamp=int(time.time()),       # 时间戳
        type='notification',              # 消息类型
        user_id='user-1',                 # 用户 ID
        app_id='app-1'                    # 应用 ID
    )
)

消息确认

消费者处理完消息后需要发送确认:

def callback(ch, method, properties, body):
    try:
        # 处理消息
        process_message(body)
        # 确认消息
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        # 拒绝消息,重新入队
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

确认类型: - basic_ack:确认成功 - basic_nack:批量拒绝 - basic_reject:单条拒绝

3.6 Connection 和 Channel

Connection

Connection 是客户端与 RabbitMQ 之间的 TCP 连接:

import pika

# 创建连接
connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host='localhost',
        port=5672,
        virtual_host='/',
        credentials=pika.PlainCredentials('user', 'password'),
        heartbeat=60,
        blocked_connection_timeout=300
    )
)

Channel

Channel 是 Connection 内的轻量级连接,用于复用:

# 创建通道
channel = connection.channel()

# 一个 Connection 可以创建多个 Channel
channel1 = connection.channel()
channel2 = connection.channel()

连接池

高并发场景下使用连接池:

from pika import pool

parameters = pika.ConnectionParameters(host='localhost')

connection_pool = pool.QueuedPool(
    create=lambda: pika.BlockingConnection(parameters),
    max_size=10,
    max_overflow=10,
    timeout=10,
    recycle=3600
)

def get_channel():
    with connection_pool.item() as connection:
        return connection.channel()

3.7 Virtual Host(虚拟主机)

虚拟主机提供逻辑隔离:

# 连接到指定虚拟主机
connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host='localhost',
        virtual_host='/project1',
        credentials=pika.PlainCredentials('user', 'password')
    )
)

不同虚拟主机之间的资源完全隔离,适合多租户场景。

3.8 死信队列

死信队列(Dead Letter Queue)用于存储无法被正常消费的消息。

死信产生条件

  1. 消息被拒绝且 requeue=false
  2. 消息 TTL 过期
  3. 队列达到最大长度

配置死信队列

# 声明死信交换机
channel.exchange_declare(exchange='dlx', exchange_type='direct')

# 声明死信队列
channel.queue_declare(queue='dead_letter_queue')
channel.queue_bind(queue='dead_letter_queue', exchange='dlx', routing_key='dead')

# 声明业务队列,配置死信
channel.queue_declare(
    queue='business_queue',
    arguments={
        'x-dead-letter-exchange': 'dlx',
        'x-dead-letter-routing-key': 'dead'
    }
)

3.9 延迟队列

RabbitMQ 本身不支持延迟队列,但可以通过 TTL + 死信队列实现:

# 声明延迟交换机和队列
channel.exchange_declare(exchange='delay_exchange', exchange_type='direct')

# 声明延迟队列(消息过期后进入死信队列)
channel.queue_declare(
    queue='delay_queue_30s',
    arguments={
        'x-message-ttl': 30000,  # 30秒延迟
        'x-dead-letter-exchange': 'process_exchange',
        'x-dead-letter-routing-key': 'process'
    }
)

# 发送延迟消息
channel.basic_publish(
    exchange='delay_exchange',
    routing_key='delay_queue_30s',
    body='Delayed message'
)

也可以使用 rabbitmq_delayed_message_exchange 插件:

# 安装插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 声明延迟交换机
channel.exchange_declare(
    exchange='delayed',
    exchange_type='x-delayed-message',
    arguments={'x-delayed-type': 'direct'}
)

# 发送延迟消息
channel.basic_publish(
    exchange='delayed',
    routing_key='key',
    body='Delayed message',
    properties=pika.BasicProperties(headers={'x-delay': 30000})  # 30秒
)

3.10 小结

本章介绍了 RabbitMQ 的核心概念:

  1. 消息模型和流转过程
  2. 四种交换机类型及其使用场景
  3. 队列的属性和配置
  4. 绑定的概念
  5. 消息的属性和确认机制
  6. Connection 和 Channel
  7. 虚拟主机
  8. 死信队列和延迟队列

下一章将介绍工作队列模式。