跳转至

第八章:生产实践

最佳实践

生产者最佳实践

// 1. 合理设置生产者组名
DefaultMQProducer producer = new DefaultMQProducer("OrderProducerGroup");

// 2. 设置重试次数
producer.setRetryTimesWhenSendFailed(3);

// 3. 设置超时时间
producer.setSendMsgTimeout(3000);

// 4. 设置消息大小限制
producer.setMaxMessageSize(4 * 1024 * 1024);  // 4MB

// 5. 使用 Key 便于查询
Message msg = new Message("TopicTest", "TagA", "Hello".getBytes());
msg.setKeys("OrderID_" + orderId);

// 6. 异步发送处理回调
producer.send(msg, new SendCallback() {
    @Override
    public void onSuccess(SendResult result) {
        log.info("Send success: {}", result);
    }

    @Override
    public void onException(Throwable e) {
        log.error("Send failed", e);
        // 重试或记录失败
    }
});

消费者最佳实践

// 1. 合理设置消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OrderConsumerGroup");

// 2. 设置消费线程数
consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(64);

// 3. 设置批量消费大小
consumer.setConsumeMessageBatchMaxSize(16);

// 4. 设置最大重试次数
consumer.setMaxReconsumeTimes(16);

// 5. 幂等处理
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(
            List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for (MessageExt msg : msgs) {
            String key = msg.getKeys();

            // 幂等检查
            if (isProcessed(key)) {
                continue;
            }

            try {
                // 处理消息
                processMessage(msg);

                // 标记已处理
                markProcessed(key);
            } catch (Exception e) {
                log.error("Process failed", e);
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

性能优化

Broker 优化

# JVM 配置
JAVA_OPT="-server -Xms8g -Xmx8g -Xmn4g"

# GC 配置
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC"

# 线程池配置
sendMessageThreadPoolNums = 32
pullMessageThreadPoolNums = 32

# 刷盘配置(高性能场景)
flushDiskType = ASYNC_FLUSH
flushCommitLogTimed = false

# 内存配置
transientStorePoolEnable = true
transientStorePoolSize = 5

生产者优化

// 批量发送
List<Message> messages = new ArrayList<>();
for (int i = 0; i < 50; i++) {
    messages.add(new Message("TopicTest", ("Message " + i).getBytes()));
}
producer.send(messages);

// 异步发送
producer.send(msg, callback);

// 压缩消息
Message msg = new Message("TopicTest", compress(body));

消费者优化

// 增加消费线程
consumer.setConsumeThreadMin(64);
consumer.setConsumeThreadMax(128);

// 批量消费
consumer.setConsumeMessageBatchMaxSize(32);

// 并行消费
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(
            List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        // 并行处理
        msgs.parallelStream().forEach(msg -> processMessage(msg));
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

容量规划

硬件配置

场景 CPU 内存 磁盘 网络
小规模 8核 16GB 500GB SSD 1Gbps
中规模 16核 32GB 1TB SSD 10Gbps
大规模 32核 64GB 2TB SSD 10Gbps

容量估算

# 消息大小:1KB
# 日消息量:1亿条
# 消息保留:3天

存储容量 = 1KB × 1亿 × 3天 × 2副本 × 1.2冗余
        = 1KB × 100,000,000 × 3 × 2 × 1.2
        = 720GB

故障处理

消息堆积

# 1. 查看堆积情况
./mqadmin consumerProgress -n localhost:9876 -g ConsumerGroup

# 2. 临时扩容消费者
# 增加消费者实例数

# 3. 重置消费进度
./mqadmin resetOffsetByTime -n localhost:9876 -g ConsumerGroup -t TopicTest -s now

Broker 宕机

# 1. 检查 Broker 状态
./mqadmin clusterList -n localhost:9876

# 2. 切换到 Slave
# 客户端自动切换到可用的 Broker

# 3. 恢复 Master
# 重启 Broker 服务

小结

生产实践要点:

  • 最佳实践:生产者、消费者配置
  • 性能优化:Broker、生产者、消费者优化
  • 容量规划:硬件配置、容量估算
  • 故障处理:消息堆积、Broker 宕机

完成本教程后,你应该能够在生产环境中部署和管理 RocketMQ 消息队列。