第三章:核心概念¶
3.1 消息模型¶
RabbitMQ 的消息模型基于 AMQP 协议,核心概念包括:
消息流转过程¶
- 生产者**创建消息并发送到 **交换机
- 交换机**根据路由规则将消息路由到一个或多个 **队列
- **队列**存储消息,等待消费者获取
- **消费者**从队列获取消息并处理
3.2 Exchange(交换机)¶
交换机是消息的路由中心,接收生产者发送的消息,根据路由规则分发到队列。
交换机类型¶
| 类型 | 说明 |
|---|---|
| direct | 直接匹配,消息路由到 Routing Key 完全匹配的队列 |
| fanout | 广播,消息路由到所有绑定的队列 |
| topic | 主题匹配,支持通配符匹配 Routing Key |
| headers | 头部匹配,根据消息头属性路由(较少使用) |
Direct Exchange¶
直接交换机根据 Routing Key 精确匹配:
# 声明 direct 交换机
channel.exchange_declare(exchange='logs_direct', exchange_type='direct')
# 绑定队列
channel.queue_bind(queue='info_queue', exchange='logs_direct', routing_key='info')
channel.queue_bind(queue='error_queue', exchange='logs_direct', routing_key='error')
# 发送消息
channel.basic_publish(exchange='logs_direct', routing_key='info', body='Info message')
channel.basic_publish(exchange='logs_direct', routing_key='error', body='Error message')
Fanout Exchange¶
扇出交换机忽略 Routing Key,将消息广播到所有绑定的队列:
# 声明 fanout 交换机
channel.exchange_declare(exchange='logs_fanout', exchange_type='fanout')
# 绑定队列(不需要 routing_key)
channel.queue_bind(queue='queue_a', exchange='logs_fanout')
channel.queue_bind(queue='queue_b', exchange='logs_fanout')
# 发送消息(routing_key 被忽略)
channel.basic_publish(exchange='logs_fanout', routing_key='', body='Broadcast message')
Topic Exchange¶
主题交换机支持通配符匹配:
*匹配一个单词#匹配零个或多个单词
┌─────────┐
Producer ──RK:user.*──▶ topic │───user.*──▶ Queue A
│ │───user.#───▶ Queue B
│ │───#.error──▶ Queue C
└─────────┘
# 声明 topic 交换机
channel.exchange_declare(exchange='logs_topic', exchange_type='topic')
# 绑定队列
channel.queue_bind(queue='user_queue', exchange='logs_topic', routing_key='user.*')
channel.queue_bind(queue='all_user_queue', exchange='logs_topic', routing_key='user.#')
channel.queue_bind(queue='error_queue', exchange='logs_topic', routing_key='#.error')
# 发送消息
channel.basic_publish(exchange='logs_topic', routing_key='user.info', body='User info')
channel.basic_publish(exchange='logs_topic', routing_key='user.profile.error', body='Profile error')
Headers Exchange¶
头部交换机根据消息头属性匹配:
# 声明 headers 交换机
channel.exchange_declare(exchange='logs_headers', exchange_type='headers')
# 绑定队列
channel.queue_bind(
queue='queue_a',
exchange='logs_headers',
arguments={
'x-match': 'all', # all: 所有头部匹配; any: 任一头部匹配
'format': 'pdf',
'type': 'report'
}
)
# 发送消息
channel.basic_publish(
exchange='logs_headers',
routing_key='', # 被忽略
body='Report content',
properties=pika.BasicProperties(
headers={'format': 'pdf', 'type': 'report'}
)
)
默认交换机¶
RabbitMQ 提供一个默认的 direct 交换机(名称为空字符串 ''),消息直接路由到 Routing Key 同名的队列:
# 使用默认交换机
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
3.3 Queue(队列)¶
队列是消息的存储容器,遵循 FIFO(先进先出)原则。
队列属性¶
channel.queue_declare(
queue='task_queue',
durable=True, # 持久化
exclusive=False, # 是否独占(连接关闭时自动删除)
auto_delete=False, # 无消费者时自动删除
arguments={
'x-message-ttl': 60000, # 消息 TTL(毫秒)
'x-expires': 86400000, # 队列过期时间(毫秒)
'x-max-length': 1000, # 最大消息数
'x-max-length-bytes': 10485760, # 最大字节数
'x-dead-letter-exchange': 'dlx', # 死信交换机
'x-dead-letter-routing-key': 'dead' # 死信路由键
}
)
持久化¶
持久化队列在 RabbitMQ 重启后依然存在:
# 声明持久化队列
channel.queue_declare(queue='durable_queue', durable=True)
# 发送持久化消息
channel.basic_publish(
exchange='',
routing_key='durable_queue',
body='Persistent message',
properties=pika.BasicProperties(delivery_mode=2) # 2 表示持久化
)
临时队列¶
临时队列在消费者断开连接后自动删除:
# 声明临时队列(自动生成队列名)
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
队列长度限制¶
# 最大消息数限制
channel.queue_declare(
queue='limited_queue',
arguments={
'x-max-length': 1000,
'x-overflow': 'reject-publish' # 或 'drop-head'(默认)
}
)
# 最大字节数限制
channel.queue_declare(
queue='byte_limited_queue',
arguments={
'x-max-length-bytes': 10485760 # 10MB
}
)
消息 TTL¶
# 队列级别 TTL
channel.queue_declare(
queue='ttl_queue',
arguments={'x-message-ttl': 60000} # 60秒
)
# 消息级别 TTL
channel.basic_publish(
exchange='',
routing_key='queue',
body='Message with TTL',
properties=pika.BasicProperties(expiration='30000') # 30秒
)
3.4 Binding(绑定)¶
绑定定义了交换机和队列之间的关系。
# 绑定队列到交换机
channel.queue_bind(
queue='queue_name',
exchange='exchange_name',
routing_key='routing_key'
)
# 查看绑定
channel.queue_bind(queue='q1', exchange='e1', routing_key='key1')
channel.queue_bind(queue='q1', exchange='e1', routing_key='key2')
# 一个队列可以绑定多个 routing_key
# 一个队列可以绑定多个交换机
3.5 Message(消息)¶
消息由两部分组成:消息体和消息属性。
消息属性¶
channel.basic_publish(
exchange='',
routing_key='queue',
body='Message body',
properties=pika.BasicProperties(
content_type='application/json', # 内容类型
content_encoding='utf-8', # 编码
headers={'key': 'value'}, # 自定义头部
delivery_mode=2, # 持久化
priority=0, # 优先级(0-9)
correlation_id='corr-123', # 关联 ID(RPC 用)
reply_to='reply_queue', # 回复队列(RPC 用)
expiration='60000', # TTL(毫秒)
message_id='msg-123', # 消息 ID
timestamp=int(time.time()), # 时间戳
type='notification', # 消息类型
user_id='user-1', # 用户 ID
app_id='app-1' # 应用 ID
)
)
消息确认¶
消费者处理完消息后需要发送确认:
def callback(ch, method, properties, body):
try:
# 处理消息
process_message(body)
# 确认消息
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
# 拒绝消息,重新入队
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
确认类型:
- basic_ack:确认成功
- basic_nack:批量拒绝
- basic_reject:单条拒绝
3.6 Connection 和 Channel¶
Connection¶
Connection 是客户端与 RabbitMQ 之间的 TCP 连接:
import pika
# 创建连接
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='localhost',
port=5672,
virtual_host='/',
credentials=pika.PlainCredentials('user', 'password'),
heartbeat=60,
blocked_connection_timeout=300
)
)
Channel¶
Channel 是 Connection 内的轻量级连接,用于复用:
# 创建通道
channel = connection.channel()
# 一个 Connection 可以创建多个 Channel
channel1 = connection.channel()
channel2 = connection.channel()
连接池¶
高并发场景下使用连接池:
from pika import pool
parameters = pika.ConnectionParameters(host='localhost')
connection_pool = pool.QueuedPool(
create=lambda: pika.BlockingConnection(parameters),
max_size=10,
max_overflow=10,
timeout=10,
recycle=3600
)
def get_channel():
with connection_pool.item() as connection:
return connection.channel()
3.7 Virtual Host(虚拟主机)¶
虚拟主机提供逻辑隔离:
# 连接到指定虚拟主机
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='localhost',
virtual_host='/project1',
credentials=pika.PlainCredentials('user', 'password')
)
)
不同虚拟主机之间的资源完全隔离,适合多租户场景。
3.8 死信队列¶
死信队列(Dead Letter Queue)用于存储无法被正常消费的消息。
死信产生条件¶
- 消息被拒绝且 requeue=false
- 消息 TTL 过期
- 队列达到最大长度
配置死信队列¶
# 声明死信交换机
channel.exchange_declare(exchange='dlx', exchange_type='direct')
# 声明死信队列
channel.queue_declare(queue='dead_letter_queue')
channel.queue_bind(queue='dead_letter_queue', exchange='dlx', routing_key='dead')
# 声明业务队列,配置死信
channel.queue_declare(
queue='business_queue',
arguments={
'x-dead-letter-exchange': 'dlx',
'x-dead-letter-routing-key': 'dead'
}
)
3.9 延迟队列¶
RabbitMQ 本身不支持延迟队列,但可以通过 TTL + 死信队列实现:
# 声明延迟交换机和队列
channel.exchange_declare(exchange='delay_exchange', exchange_type='direct')
# 声明延迟队列(消息过期后进入死信队列)
channel.queue_declare(
queue='delay_queue_30s',
arguments={
'x-message-ttl': 30000, # 30秒延迟
'x-dead-letter-exchange': 'process_exchange',
'x-dead-letter-routing-key': 'process'
}
)
# 发送延迟消息
channel.basic_publish(
exchange='delay_exchange',
routing_key='delay_queue_30s',
body='Delayed message'
)
也可以使用 rabbitmq_delayed_message_exchange 插件:
# 声明延迟交换机
channel.exchange_declare(
exchange='delayed',
exchange_type='x-delayed-message',
arguments={'x-delayed-type': 'direct'}
)
# 发送延迟消息
channel.basic_publish(
exchange='delayed',
routing_key='key',
body='Delayed message',
properties=pika.BasicProperties(headers={'x-delay': 30000}) # 30秒
)
3.10 小结¶
本章介绍了 RabbitMQ 的核心概念:
- 消息模型和流转过程
- 四种交换机类型及其使用场景
- 队列的属性和配置
- 绑定的概念
- 消息的属性和确认机制
- Connection 和 Channel
- 虚拟主机
- 死信队列和延迟队列
下一章将介绍工作队列模式。