跳转至

第十章:生产实践

10.1 高可用配置

镜像队列

在 RabbitMQ 集群中配置镜像队列,确保消息高可用:

# 通过命令行设置策略
rabbitmqctl set_policy ha-all ".*" '{"ha-mode":"all","ha-sync-mode":"automatic"}'

# 针对特定队列设置
rabbitmqctl set_policy ha-orders "^orders\." '{"ha-mode":"exactly","ha-params":2}'

# 查看策略
rabbitmqctl list_policies

集群配置

# 节点1 (主节点)
rabbitmq-server -detached
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app

# 节点2 (加入集群)
rabbitmq-server -detached
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app

# 查看集群状态
rabbitmqctl cluster_status

Python 连接集群

import pika

# 多节点连接
connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host='node1',
        port=5672,
        credentials=pika.PlainCredentials('user', 'password'),
        # 故障转移
        connection_attempts=3,
        retry_delay=5,
        # 心跳
        heartbeat=60
    )
)

# 使用多个地址
from pika.connection import SSLOptions

hosts = ['node1', 'node2', 'node3']
for host in hosts:
    try:
        connection = pika.BlockingConnection(
            pika.ConnectionParameters(host)
        )
        break
    except pika.exceptions.AMQPConnectionError:
        continue

10.2 性能优化

预取数量调优

# 设置合理的预取数量
channel.basic_qos(prefetch_count=10)

# 预取数量建议:
# - CPU 密集型任务:prefetch_count = CPU 核心数
# - I/O 密集型任务:prefetch_count = 10-50
# - 混合型任务:prefetch_count = 5-20

批量发送

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 开启发布确认
channel.confirm_delivery()

# 批量发送
messages = [f"Message {i}" for i in range(1000)]

for msg in messages:
    channel.basic_publish(
        exchange='',
        routing_key='batch_queue',
        body=msg,
        properties=pika.BasicProperties(
            delivery_mode=2  # 持久化
        )
    )

connection.close()

异步发布

import pika

# 使用 SelectConnection 实现异步发布
class AsyncPublisher:
    def __init__(self):
        self.connection = None
        self.channel = None
        self.messages = []
        self.published = 0

    def connect(self):
        self.connection = pika.SelectConnection(
            pika.ConnectionParameters('localhost'),
            on_open_callback=self.on_connection_open
        )
        self.connection.ioloop.start()

    def on_connection_open(self, connection):
        connection.channel(on_open_callback=self.on_channel_open)

    def on_channel_open(self, channel):
        self.channel = channel
        channel.confirm_delivery()
        self.publish_messages()

    def publish_messages(self):
        for i in range(10000):
            self.channel.basic_publish(
                exchange='',
                routing_key='test',
                body=f'Message {i}'
            )
        self.connection.close()

publisher = AsyncPublisher()
publisher.connect()

消息大小优化

import json
import zlib
import base64

def compress_message(data: dict) -> str:
    """压缩消息"""
    json_str = json.dumps(data)
    compressed = zlib.compress(json_str.encode())
    return base64.b64encode(compressed).decode()

def decompress_message(compressed: str) -> dict:
    """解压消息"""
    decoded = base64.b64decode(compressed.encode())
    decompressed = zlib.decompress(decoded)
    return json.loads(decompressed.decode())

# 使用
large_data = {"items": list(range(10000))}
compressed = compress_message(large_data)
print(f"Original: {len(json.dumps(large_data))} bytes")
print(f"Compressed: {len(compressed)} bytes")

10.3 监控与告警

Prometheus 监控

# prometheus.yml
scrape_configs:
  - job_name: 'rabbitmq'
    static_configs:
      - targets: ['localhost:15692']

启用 Prometheus 插件

# 启用 prometheus 插件
rabbitmq-plugins enable rabbitmq_prometheus

# 访问指标
curl http://localhost:15692/metrics

关键监控指标

# 监控脚本
import requests
import json

def get_rabbitmq_stats(host='localhost', port=15672, user='guest', password='guest'):
    """获取 RabbitMQ 统计信息"""
    url = f"http://{host}:{port}/api/overview"
    response = requests.get(url, auth=(user, password))
    return response.json()

def check_queue_depth(stats, threshold=10000):
    """检查队列深度"""
    messages = stats.get('queue_totals', {}).get('messages', 0)
    if messages > threshold:
        print(f"⚠️ Queue depth alert: {messages} messages")
    return messages

def check_memory_usage(stats, threshold=0.8):
    """检查内存使用"""
    mem_used = stats.get('node', {}).get('mem_used', 0)
    mem_limit = stats.get('node', {}).get('mem_limit', 1)
    usage = mem_used / mem_limit

    if usage > threshold:
        print(f"⚠️ Memory usage alert: {usage:.2%}")
    return usage

def check_consumers(stats):
    """检查消费者数量"""
    consumers = stats.get('object_totals', {}).get('consumers', 0)
    if consumers == 0:
        print("⚠️ No consumers connected!")
    return consumers

# 定期检查
stats = get_rabbitmq_stats()
check_queue_depth(stats)
check_memory_usage(stats)
check_consumers(stats)

Grafana Dashboard

{
  "dashboard": {
    "title": "RabbitMQ Monitoring",
    "panels": [
      {
        "title": "Queue Depth",
        "targets": [
          {
            "expr": "rabbitmq_queue_messages"
          }
        ]
      },
      {
        "title": "Message Rate",
        "targets": [
          {
            "expr": "rate(rabbitmq_queue_messages_ready[5m])"
          }
        ]
      },
      {
        "title": "Memory Usage",
        "targets": [
          {
            "expr": "rabbitmq_node_mem_used / rabbitmq_node_mem_limit"
          }
        ]
      }
    ]
  }
}

10.4 安全配置

用户权限管理

# 创建用户
rabbitmqctl add_user admin password123

# 设置用户标签
rabbitmqctl set_user_tags admin administrator

# 设置权限
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

# 查看用户列表
rabbitmqctl list_users

# 删除用户
rabbitmqctl delete_user guest

vhost 隔离

# 创建虚拟主机
rabbitmqctl add_vhost production
rabbitmqctl add_vhost development

# 设置权限
rabbitmqctl set_permissions -p production app_user ".*" ".*" ".*"
rabbitmqctl set_permissions -p development dev_user ".*" ".*" ".*"

SSL/TLS 配置

import pika
import ssl

# SSL 连接
ssl_context = ssl.create_default_context(
    cafile="/path/to/ca_certificate.pem"
)
ssl_context.load_cert_chain(
    "/path/to/client_certificate.pem",
    "/path/to/client_key.pem"
)

ssl_options = pika.SSLOptions(
    context=ssl_context,
    server_hostname="rabbitmq.example.com"
)

connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host='rabbitmq.example.com',
        port=5671,
        ssl_options=ssl_options
    )
)

配置文件安全

# /etc/rabbitmq/rabbitmq.conf

# 禁用 guest 用户远程访问
loopback_users = none

# 限制连接数
connection_max = 1000

# 内存阈值
vm_memory_high_watermark.relative = 0.6

# 磁盘阈值
disk_free_limit.absolute = 10GB

# 启用 TLS
listeners.ssl.default = 5671
ssl_options.cacertfile = /path/to/ca.pem
ssl_options.certfile = /path/to/server.pem
ssl_options.keyfile = /path/to/server.key
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = true

10.5 消息可靠性

发布确认

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 开启发布确认
channel.confirm_delivery()

try:
    channel.basic_publish(
        exchange='',
        routing_key='test',
        body='Hello',
        properties=pika.BasicProperties(delivery_mode=2)
    )
    print("Message confirmed")
except pika.exceptions.UnroutableError:
    print("Message was returned (unroutable)")
except pika.exceptions.NackError:
    print("Message was nacked (broker rejected)")

connection.close()

持久化配置

# 声明持久化队列
channel.queue_declare(
    queue='durable_queue',
    durable=True
)

# 声明持久化交换机
channel.exchange_declare(
    exchange='durable_exchange',
    exchange_type='direct',
    durable=True
)

# 发送持久化消息
channel.basic_publish(
    exchange='',
    routing_key='durable_queue',
    body='Persistent message',
    properties=pika.BasicProperties(
        delivery_mode=2  # 持久化
    )
)

死信队列

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明死信交换机
channel.exchange_declare(exchange='dlx', exchange_type='direct')

# 声明死信队列
channel.queue_declare(queue='dead_letter_queue')
channel.queue_bind(exchange='dlx', queue='dead_letter_queue', routing_key='dead')

# 声明主队列,配置死信
args = {
    'x-dead-letter-exchange': 'dlx',
    'x-dead-letter-routing-key': 'dead',
    'x-message-ttl': 60000  # 消息过期时间(毫秒)
}
channel.queue_declare(queue='main_queue', arguments=args)

# 消费死信队列
def on_dead_letter(ch, method, properties, body):
    print(f"Dead letter received: {body}")
    # 处理失败消息
    print(f"Reason: {properties.headers.get('x-death', [])}")

channel.basic_consume(
    queue='dead_letter_queue',
    on_message_callback=on_dead_letter,
    auto_ack=True
)

channel.start_consuming()

消息重试

import pika
import json
import time

class RetryConsumer:
    def __init__(self, queue, max_retries=3, retry_delay=5):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters('localhost')
        )
        self.channel = self.connection.channel()
        self.queue = queue
        self.max_retries = max_retries
        self.retry_delay = retry_delay

        # 声明重试队列
        self.channel.queue_declare(queue=f'{queue}_retry')

    def consume(self):
        self.channel.basic_consume(
            queue=self.queue,
            on_message_callback=self.handle_message
        )
        self.channel.start_consuming()

    def handle_message(self, ch, method, properties, body):
        headers = properties.headers or {}
        retry_count = headers.get('x-retry-count', 0)

        try:
            # 处理消息
            self.process_message(body)
            ch.basic_ack(delivery_tag=method.delivery_tag)

        except Exception as e:
            if retry_count < self.max_retries:
                # 重试
                print(f"Retry {retry_count + 1}/{self.max_retries}")
                self.retry_message(body, retry_count + 1)
                ch.basic_ack(delivery_tag=method.delivery_tag)
            else:
                # 超过最大重试次数,发送到死信队列
                print(f"Max retries exceeded, sending to DLQ")
                ch.basic_nack(
                    delivery_tag=method.delivery_tag,
                    requeue=False
                )

    def process_message(self, body):
        # 业务处理
        data = json.loads(body)
        print(f"Processing: {data}")
        # 模拟错误
        raise Exception("Processing failed")

    def retry_message(self, body, retry_count):
        # 延迟后重新发送
        time.sleep(self.retry_delay)

        self.channel.basic_publish(
            exchange='',
            routing_key=self.queue,
            body=body,
            properties=pika.BasicProperties(
                headers={'x-retry-count': retry_count}
            )
        )

consumer = RetryConsumer('test_queue')
consumer.consume()

10.6 运维脚本

队列管理脚本

#!/usr/bin/env python3
"""RabbitMQ 队列管理脚本"""

import requests
import argparse
from typing import List

class RabbitMQAdmin:
    def __init__(self, host='localhost', port=15672, user='guest', password='guest'):
        self.base_url = f"http://{host}:{port}/api"
        self.auth = (user, password)

    def list_queues(self) -> List[dict]:
        """列出所有队列"""
        response = requests.get(f"{self.base_url}/queues", auth=self.auth)
        return response.json()

    def purge_queue(self, queue: str, vhost: str = '/'):
        """清空队列"""
        vhost_encoded = requests.utils.quote(vhost, safe='')
        response = requests.delete(
            f"{self.base_url}/queues/{vhost_encoded}/{queue}/contents",
            auth=self.auth
        )
        return response.status_code == 204

    def delete_queue(self, queue: str, vhost: str = '/'):
        """删除队列"""
        vhost_encoded = requests.utils.quote(vhost, safe='')
        response = requests.delete(
            f"{self.base_url}/queues/{vhost_encoded}/{queue}",
            auth=self.auth
        )
        return response.status_code == 204

    def get_queue_info(self, queue: str, vhost: str = '/'):
        """获取队列信息"""
        vhost_encoded = requests.utils.quote(vhost, safe='')
        response = requests.get(
            f"{self.base_url}/queues/{vhost_encoded}/{queue}",
            auth=self.auth
        )
        return response.json()

def main():
    parser = argparse.ArgumentParser(description='RabbitMQ Queue Admin')
    parser.add_argument('--host', default='localhost')
    parser.add_argument('--user', default='guest')
    parser.add_argument('--password', default='guest')

    subparsers = parser.add_subparsers(dest='command')

    # list 命令
    subparsers.add_parser('list', help='List all queues')

    # purge 命令
    purge_parser = subparsers.add_parser('purge', help='Purge a queue')
    purge_parser.add_argument('queue', help='Queue name')

    # delete 命令
    delete_parser = subparsers.add_parser('delete', help='Delete a queue')
    delete_parser.add_argument('queue', help='Queue name')

    # info 命令
    info_parser = subparsers.add_parser('info', help='Get queue info')
    info_parser.add_argument('queue', help='Queue name')

    args = parser.parse_args()

    admin = RabbitMQAdmin(
        host=args.host,
        user=args.user,
        password=args.password
    )

    if args.command == 'list':
        queues = admin.list_queues()
        for q in queues:
            print(f"{q['name']}: {q['messages']} messages")

    elif args.command == 'purge':
        if admin.purge_queue(args.queue):
            print(f"Queue {args.queue} purged")

    elif args.command == 'delete':
        if admin.delete_queue(args.queue):
            print(f"Queue {args.queue} deleted")

    elif args.command == 'info':
        info = admin.get_queue_info(args.queue)
        print(json.dumps(info, indent=2))

if __name__ == '__main__':
    main()

健康检查脚本

#!/usr/bin/env python3
"""RabbitMQ 健康检查脚本"""

import requests
import sys
import json

def check_rabbitmq_health(host='localhost', port=15672, user='guest', password='guest'):
    """检查 RabbitMQ 健康状态"""
    try:
        # 检查 API 可用性
        response = requests.get(
            f"http://{host}:{port}/api/overview",
            auth=(user, password),
            timeout=10
        )

        if response.status_code != 200:
            return False, "API not accessible"

        data = response.json()

        # 检查节点状态
        node_status = data.get('node', {})
        if not node_status.get('running', False):
            return False, "Node not running"

        # 检查内存
        mem_used = data.get('node', {}).get('mem_used', 0)
        mem_limit = data.get('node', {}).get('mem_limit', 1)
        mem_usage = mem_used / mem_limit if mem_limit > 0 else 0

        if mem_usage > 0.9:
            return False, f"Memory usage too high: {mem_usage:.2%}"

        # 检查磁盘
        disk_free = data.get('node', {}).get('disk_free', 0)
        disk_limit = data.get('node', {}).get('disk_free_limit', 1)

        if disk_free < disk_limit:
            return False, "Disk space below limit"

        # 检查队列
        queue_totals = data.get('queue_totals', {})
        messages = queue_totals.get('messages', 0)

        return True, {
            'status': 'healthy',
            'messages': messages,
            'memory_usage': f"{mem_usage:.2%}",
            'disk_free': f"{disk_free / (1024**3):.2f} GB"
        }

    except requests.exceptions.RequestException as e:
        return False, f"Connection error: {e}"

if __name__ == '__main__':
    healthy, result = check_rabbitmq_health()

    if healthy:
        print(json.dumps(result, indent=2))
        sys.exit(0)
    else:
        print(f"❌ Health check failed: {result}")
        sys.exit(1)

10.7 最佳实践总结

生产环境清单

  • 启用消息持久化
  • 配置镜像队列
  • 设置合理的预取数量
  • 实现死信队列
  • 配置监控告警
  • 启用 TLS 加密
  • 设置用户权限
  • 配置内存和磁盘阈值
  • 实现消息重试机制
  • 定期备份配置

性能调优建议

  1. 连接管理:使用连接池,避免频繁创建连接
  2. 消息大小:控制消息大小,大消息考虑压缩或分片
  3. 批量操作:批量发送消息提高吞吐量
  4. 预取数量:根据任务类型调整预取数量
  5. 队列设计:避免过多队列,合理规划队列数量

常见问题排查

# 查看队列状态
rabbitmqctl list_queues name messages consumers

# 查看连接
rabbitmqctl list_connections

# 查看通道
rabbitmqctl list_channels

# 查看内存使用
rabbitmqctl status | grep memory

# 重置节点(慎用)
rabbitmqctl reset

10.8 小结

本章介绍了 RabbitMQ 生产实践:

  1. 高可用配置(镜像队列、集群)
  2. 性能优化技巧
  3. 监控与告警配置
  4. 安全配置(用户权限、SSL/TLS)
  5. 消息可靠性保障
  6. 运维脚本工具
  7. 最佳实践总结

恭喜你完成了 RabbitMQ 教程的学习!