第十章:生产实践¶
10.1 高可用配置¶
镜像队列¶
在 RabbitMQ 集群中配置镜像队列,确保消息高可用:
# 通过命令行设置策略
rabbitmqctl set_policy ha-all ".*" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
# 针对特定队列设置
rabbitmqctl set_policy ha-orders "^orders\." '{"ha-mode":"exactly","ha-params":2}'
# 查看策略
rabbitmqctl list_policies
集群配置¶
# 节点1 (主节点)
rabbitmq-server -detached
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
# 节点2 (加入集群)
rabbitmq-server -detached
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app
# 查看集群状态
rabbitmqctl cluster_status
Python 连接集群¶
import pika
# 多节点连接
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='node1',
port=5672,
credentials=pika.PlainCredentials('user', 'password'),
# 故障转移
connection_attempts=3,
retry_delay=5,
# 心跳
heartbeat=60
)
)
# 使用多个地址
from pika.connection import SSLOptions
hosts = ['node1', 'node2', 'node3']
for host in hosts:
try:
connection = pika.BlockingConnection(
pika.ConnectionParameters(host)
)
break
except pika.exceptions.AMQPConnectionError:
continue
10.2 性能优化¶
预取数量调优¶
# 设置合理的预取数量
channel.basic_qos(prefetch_count=10)
# 预取数量建议:
# - CPU 密集型任务:prefetch_count = CPU 核心数
# - I/O 密集型任务:prefetch_count = 10-50
# - 混合型任务:prefetch_count = 5-20
批量发送¶
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 开启发布确认
channel.confirm_delivery()
# 批量发送
messages = [f"Message {i}" for i in range(1000)]
for msg in messages:
channel.basic_publish(
exchange='',
routing_key='batch_queue',
body=msg,
properties=pika.BasicProperties(
delivery_mode=2 # 持久化
)
)
connection.close()
异步发布¶
import pika
# 使用 SelectConnection 实现异步发布
class AsyncPublisher:
def __init__(self):
self.connection = None
self.channel = None
self.messages = []
self.published = 0
def connect(self):
self.connection = pika.SelectConnection(
pika.ConnectionParameters('localhost'),
on_open_callback=self.on_connection_open
)
self.connection.ioloop.start()
def on_connection_open(self, connection):
connection.channel(on_open_callback=self.on_channel_open)
def on_channel_open(self, channel):
self.channel = channel
channel.confirm_delivery()
self.publish_messages()
def publish_messages(self):
for i in range(10000):
self.channel.basic_publish(
exchange='',
routing_key='test',
body=f'Message {i}'
)
self.connection.close()
publisher = AsyncPublisher()
publisher.connect()
消息大小优化¶
import json
import zlib
import base64
def compress_message(data: dict) -> str:
"""压缩消息"""
json_str = json.dumps(data)
compressed = zlib.compress(json_str.encode())
return base64.b64encode(compressed).decode()
def decompress_message(compressed: str) -> dict:
"""解压消息"""
decoded = base64.b64decode(compressed.encode())
decompressed = zlib.decompress(decoded)
return json.loads(decompressed.decode())
# 使用
large_data = {"items": list(range(10000))}
compressed = compress_message(large_data)
print(f"Original: {len(json.dumps(large_data))} bytes")
print(f"Compressed: {len(compressed)} bytes")
10.3 监控与告警¶
Prometheus 监控¶
# prometheus.yml
scrape_configs:
- job_name: 'rabbitmq'
static_configs:
- targets: ['localhost:15692']
启用 Prometheus 插件¶
# 启用 prometheus 插件
rabbitmq-plugins enable rabbitmq_prometheus
# 访问指标
curl http://localhost:15692/metrics
关键监控指标¶
# 监控脚本
import requests
import json
def get_rabbitmq_stats(host='localhost', port=15672, user='guest', password='guest'):
"""获取 RabbitMQ 统计信息"""
url = f"http://{host}:{port}/api/overview"
response = requests.get(url, auth=(user, password))
return response.json()
def check_queue_depth(stats, threshold=10000):
"""检查队列深度"""
messages = stats.get('queue_totals', {}).get('messages', 0)
if messages > threshold:
print(f"⚠️ Queue depth alert: {messages} messages")
return messages
def check_memory_usage(stats, threshold=0.8):
"""检查内存使用"""
mem_used = stats.get('node', {}).get('mem_used', 0)
mem_limit = stats.get('node', {}).get('mem_limit', 1)
usage = mem_used / mem_limit
if usage > threshold:
print(f"⚠️ Memory usage alert: {usage:.2%}")
return usage
def check_consumers(stats):
"""检查消费者数量"""
consumers = stats.get('object_totals', {}).get('consumers', 0)
if consumers == 0:
print("⚠️ No consumers connected!")
return consumers
# 定期检查
stats = get_rabbitmq_stats()
check_queue_depth(stats)
check_memory_usage(stats)
check_consumers(stats)
Grafana Dashboard¶
{
"dashboard": {
"title": "RabbitMQ Monitoring",
"panels": [
{
"title": "Queue Depth",
"targets": [
{
"expr": "rabbitmq_queue_messages"
}
]
},
{
"title": "Message Rate",
"targets": [
{
"expr": "rate(rabbitmq_queue_messages_ready[5m])"
}
]
},
{
"title": "Memory Usage",
"targets": [
{
"expr": "rabbitmq_node_mem_used / rabbitmq_node_mem_limit"
}
]
}
]
}
}
10.4 安全配置¶
用户权限管理¶
# 创建用户
rabbitmqctl add_user admin password123
# 设置用户标签
rabbitmqctl set_user_tags admin administrator
# 设置权限
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
# 查看用户列表
rabbitmqctl list_users
# 删除用户
rabbitmqctl delete_user guest
vhost 隔离¶
# 创建虚拟主机
rabbitmqctl add_vhost production
rabbitmqctl add_vhost development
# 设置权限
rabbitmqctl set_permissions -p production app_user ".*" ".*" ".*"
rabbitmqctl set_permissions -p development dev_user ".*" ".*" ".*"
SSL/TLS 配置¶
import pika
import ssl
# SSL 连接
ssl_context = ssl.create_default_context(
cafile="/path/to/ca_certificate.pem"
)
ssl_context.load_cert_chain(
"/path/to/client_certificate.pem",
"/path/to/client_key.pem"
)
ssl_options = pika.SSLOptions(
context=ssl_context,
server_hostname="rabbitmq.example.com"
)
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='rabbitmq.example.com',
port=5671,
ssl_options=ssl_options
)
)
配置文件安全¶
# /etc/rabbitmq/rabbitmq.conf
# 禁用 guest 用户远程访问
loopback_users = none
# 限制连接数
connection_max = 1000
# 内存阈值
vm_memory_high_watermark.relative = 0.6
# 磁盘阈值
disk_free_limit.absolute = 10GB
# 启用 TLS
listeners.ssl.default = 5671
ssl_options.cacertfile = /path/to/ca.pem
ssl_options.certfile = /path/to/server.pem
ssl_options.keyfile = /path/to/server.key
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = true
10.5 消息可靠性¶
发布确认¶
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 开启发布确认
channel.confirm_delivery()
try:
channel.basic_publish(
exchange='',
routing_key='test',
body='Hello',
properties=pika.BasicProperties(delivery_mode=2)
)
print("Message confirmed")
except pika.exceptions.UnroutableError:
print("Message was returned (unroutable)")
except pika.exceptions.NackError:
print("Message was nacked (broker rejected)")
connection.close()
持久化配置¶
# 声明持久化队列
channel.queue_declare(
queue='durable_queue',
durable=True
)
# 声明持久化交换机
channel.exchange_declare(
exchange='durable_exchange',
exchange_type='direct',
durable=True
)
# 发送持久化消息
channel.basic_publish(
exchange='',
routing_key='durable_queue',
body='Persistent message',
properties=pika.BasicProperties(
delivery_mode=2 # 持久化
)
)
死信队列¶
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明死信交换机
channel.exchange_declare(exchange='dlx', exchange_type='direct')
# 声明死信队列
channel.queue_declare(queue='dead_letter_queue')
channel.queue_bind(exchange='dlx', queue='dead_letter_queue', routing_key='dead')
# 声明主队列,配置死信
args = {
'x-dead-letter-exchange': 'dlx',
'x-dead-letter-routing-key': 'dead',
'x-message-ttl': 60000 # 消息过期时间(毫秒)
}
channel.queue_declare(queue='main_queue', arguments=args)
# 消费死信队列
def on_dead_letter(ch, method, properties, body):
print(f"Dead letter received: {body}")
# 处理失败消息
print(f"Reason: {properties.headers.get('x-death', [])}")
channel.basic_consume(
queue='dead_letter_queue',
on_message_callback=on_dead_letter,
auto_ack=True
)
channel.start_consuming()
消息重试¶
import pika
import json
import time
class RetryConsumer:
def __init__(self, queue, max_retries=3, retry_delay=5):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
self.channel = self.connection.channel()
self.queue = queue
self.max_retries = max_retries
self.retry_delay = retry_delay
# 声明重试队列
self.channel.queue_declare(queue=f'{queue}_retry')
def consume(self):
self.channel.basic_consume(
queue=self.queue,
on_message_callback=self.handle_message
)
self.channel.start_consuming()
def handle_message(self, ch, method, properties, body):
headers = properties.headers or {}
retry_count = headers.get('x-retry-count', 0)
try:
# 处理消息
self.process_message(body)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
if retry_count < self.max_retries:
# 重试
print(f"Retry {retry_count + 1}/{self.max_retries}")
self.retry_message(body, retry_count + 1)
ch.basic_ack(delivery_tag=method.delivery_tag)
else:
# 超过最大重试次数,发送到死信队列
print(f"Max retries exceeded, sending to DLQ")
ch.basic_nack(
delivery_tag=method.delivery_tag,
requeue=False
)
def process_message(self, body):
# 业务处理
data = json.loads(body)
print(f"Processing: {data}")
# 模拟错误
raise Exception("Processing failed")
def retry_message(self, body, retry_count):
# 延迟后重新发送
time.sleep(self.retry_delay)
self.channel.basic_publish(
exchange='',
routing_key=self.queue,
body=body,
properties=pika.BasicProperties(
headers={'x-retry-count': retry_count}
)
)
consumer = RetryConsumer('test_queue')
consumer.consume()
10.6 运维脚本¶
队列管理脚本¶
#!/usr/bin/env python3
"""RabbitMQ 队列管理脚本"""
import requests
import argparse
from typing import List
class RabbitMQAdmin:
def __init__(self, host='localhost', port=15672, user='guest', password='guest'):
self.base_url = f"http://{host}:{port}/api"
self.auth = (user, password)
def list_queues(self) -> List[dict]:
"""列出所有队列"""
response = requests.get(f"{self.base_url}/queues", auth=self.auth)
return response.json()
def purge_queue(self, queue: str, vhost: str = '/'):
"""清空队列"""
vhost_encoded = requests.utils.quote(vhost, safe='')
response = requests.delete(
f"{self.base_url}/queues/{vhost_encoded}/{queue}/contents",
auth=self.auth
)
return response.status_code == 204
def delete_queue(self, queue: str, vhost: str = '/'):
"""删除队列"""
vhost_encoded = requests.utils.quote(vhost, safe='')
response = requests.delete(
f"{self.base_url}/queues/{vhost_encoded}/{queue}",
auth=self.auth
)
return response.status_code == 204
def get_queue_info(self, queue: str, vhost: str = '/'):
"""获取队列信息"""
vhost_encoded = requests.utils.quote(vhost, safe='')
response = requests.get(
f"{self.base_url}/queues/{vhost_encoded}/{queue}",
auth=self.auth
)
return response.json()
def main():
parser = argparse.ArgumentParser(description='RabbitMQ Queue Admin')
parser.add_argument('--host', default='localhost')
parser.add_argument('--user', default='guest')
parser.add_argument('--password', default='guest')
subparsers = parser.add_subparsers(dest='command')
# list 命令
subparsers.add_parser('list', help='List all queues')
# purge 命令
purge_parser = subparsers.add_parser('purge', help='Purge a queue')
purge_parser.add_argument('queue', help='Queue name')
# delete 命令
delete_parser = subparsers.add_parser('delete', help='Delete a queue')
delete_parser.add_argument('queue', help='Queue name')
# info 命令
info_parser = subparsers.add_parser('info', help='Get queue info')
info_parser.add_argument('queue', help='Queue name')
args = parser.parse_args()
admin = RabbitMQAdmin(
host=args.host,
user=args.user,
password=args.password
)
if args.command == 'list':
queues = admin.list_queues()
for q in queues:
print(f"{q['name']}: {q['messages']} messages")
elif args.command == 'purge':
if admin.purge_queue(args.queue):
print(f"Queue {args.queue} purged")
elif args.command == 'delete':
if admin.delete_queue(args.queue):
print(f"Queue {args.queue} deleted")
elif args.command == 'info':
info = admin.get_queue_info(args.queue)
print(json.dumps(info, indent=2))
if __name__ == '__main__':
main()
健康检查脚本¶
#!/usr/bin/env python3
"""RabbitMQ 健康检查脚本"""
import requests
import sys
import json
def check_rabbitmq_health(host='localhost', port=15672, user='guest', password='guest'):
"""检查 RabbitMQ 健康状态"""
try:
# 检查 API 可用性
response = requests.get(
f"http://{host}:{port}/api/overview",
auth=(user, password),
timeout=10
)
if response.status_code != 200:
return False, "API not accessible"
data = response.json()
# 检查节点状态
node_status = data.get('node', {})
if not node_status.get('running', False):
return False, "Node not running"
# 检查内存
mem_used = data.get('node', {}).get('mem_used', 0)
mem_limit = data.get('node', {}).get('mem_limit', 1)
mem_usage = mem_used / mem_limit if mem_limit > 0 else 0
if mem_usage > 0.9:
return False, f"Memory usage too high: {mem_usage:.2%}"
# 检查磁盘
disk_free = data.get('node', {}).get('disk_free', 0)
disk_limit = data.get('node', {}).get('disk_free_limit', 1)
if disk_free < disk_limit:
return False, "Disk space below limit"
# 检查队列
queue_totals = data.get('queue_totals', {})
messages = queue_totals.get('messages', 0)
return True, {
'status': 'healthy',
'messages': messages,
'memory_usage': f"{mem_usage:.2%}",
'disk_free': f"{disk_free / (1024**3):.2f} GB"
}
except requests.exceptions.RequestException as e:
return False, f"Connection error: {e}"
if __name__ == '__main__':
healthy, result = check_rabbitmq_health()
if healthy:
print(json.dumps(result, indent=2))
sys.exit(0)
else:
print(f"❌ Health check failed: {result}")
sys.exit(1)
10.7 最佳实践总结¶
生产环境清单¶
- 启用消息持久化
- 配置镜像队列
- 设置合理的预取数量
- 实现死信队列
- 配置监控告警
- 启用 TLS 加密
- 设置用户权限
- 配置内存和磁盘阈值
- 实现消息重试机制
- 定期备份配置
性能调优建议¶
- 连接管理:使用连接池,避免频繁创建连接
- 消息大小:控制消息大小,大消息考虑压缩或分片
- 批量操作:批量发送消息提高吞吐量
- 预取数量:根据任务类型调整预取数量
- 队列设计:避免过多队列,合理规划队列数量
常见问题排查¶
# 查看队列状态
rabbitmqctl list_queues name messages consumers
# 查看连接
rabbitmqctl list_connections
# 查看通道
rabbitmqctl list_channels
# 查看内存使用
rabbitmqctl status | grep memory
# 重置节点(慎用)
rabbitmqctl reset
10.8 小结¶
本章介绍了 RabbitMQ 生产实践:
- 高可用配置(镜像队列、集群)
- 性能优化技巧
- 监控与告警配置
- 安全配置(用户权限、SSL/TLS)
- 消息可靠性保障
- 运维脚本工具
- 最佳实践总结
恭喜你完成了 RabbitMQ 教程的学习!