第八章:生产实践¶
最佳实践¶
生产者最佳实践¶
// 1. 合理设置生产者组名
DefaultMQProducer producer = new DefaultMQProducer("OrderProducerGroup");
// 2. 设置重试次数
producer.setRetryTimesWhenSendFailed(3);
// 3. 设置超时时间
producer.setSendMsgTimeout(3000);
// 4. 设置消息大小限制
producer.setMaxMessageSize(4 * 1024 * 1024); // 4MB
// 5. 使用 Key 便于查询
Message msg = new Message("TopicTest", "TagA", "Hello".getBytes());
msg.setKeys("OrderID_" + orderId);
// 6. 异步发送处理回调
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult result) {
log.info("Send success: {}", result);
}
@Override
public void onException(Throwable e) {
log.error("Send failed", e);
// 重试或记录失败
}
});
消费者最佳实践¶
// 1. 合理设置消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OrderConsumerGroup");
// 2. 设置消费线程数
consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(64);
// 3. 设置批量消费大小
consumer.setConsumeMessageBatchMaxSize(16);
// 4. 设置最大重试次数
consumer.setMaxReconsumeTimes(16);
// 5. 幂等处理
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
String key = msg.getKeys();
// 幂等检查
if (isProcessed(key)) {
continue;
}
try {
// 处理消息
processMessage(msg);
// 标记已处理
markProcessed(key);
} catch (Exception e) {
log.error("Process failed", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
性能优化¶
Broker 优化¶
# JVM 配置
JAVA_OPT="-server -Xms8g -Xmx8g -Xmn4g"
# GC 配置
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC"
# 线程池配置
sendMessageThreadPoolNums = 32
pullMessageThreadPoolNums = 32
# 刷盘配置(高性能场景)
flushDiskType = ASYNC_FLUSH
flushCommitLogTimed = false
# 内存配置
transientStorePoolEnable = true
transientStorePoolSize = 5
生产者优化¶
// 批量发送
List<Message> messages = new ArrayList<>();
for (int i = 0; i < 50; i++) {
messages.add(new Message("TopicTest", ("Message " + i).getBytes()));
}
producer.send(messages);
// 异步发送
producer.send(msg, callback);
// 压缩消息
Message msg = new Message("TopicTest", compress(body));
消费者优化¶
// 增加消费线程
consumer.setConsumeThreadMin(64);
consumer.setConsumeThreadMax(128);
// 批量消费
consumer.setConsumeMessageBatchMaxSize(32);
// 并行消费
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// 并行处理
msgs.parallelStream().forEach(msg -> processMessage(msg));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
容量规划¶
硬件配置¶
| 场景 | CPU | 内存 | 磁盘 | 网络 |
|---|---|---|---|---|
| 小规模 | 8核 | 16GB | 500GB SSD | 1Gbps |
| 中规模 | 16核 | 32GB | 1TB SSD | 10Gbps |
| 大规模 | 32核 | 64GB | 2TB SSD | 10Gbps |
容量估算¶
# 消息大小:1KB
# 日消息量:1亿条
# 消息保留:3天
存储容量 = 1KB × 1亿 × 3天 × 2副本 × 1.2冗余
= 1KB × 100,000,000 × 3 × 2 × 1.2
= 720GB
故障处理¶
消息堆积¶
# 1. 查看堆积情况
./mqadmin consumerProgress -n localhost:9876 -g ConsumerGroup
# 2. 临时扩容消费者
# 增加消费者实例数
# 3. 重置消费进度
./mqadmin resetOffsetByTime -n localhost:9876 -g ConsumerGroup -t TopicTest -s now
Broker 宕机¶
# 1. 检查 Broker 状态
./mqadmin clusterList -n localhost:9876
# 2. 切换到 Slave
# 客户端自动切换到可用的 Broker
# 3. 恢复 Master
# 重启 Broker 服务
小结¶
生产实践要点:
- 最佳实践:生产者、消费者配置
- 性能优化:Broker、生产者、消费者优化
- 容量规划:硬件配置、容量估算
- 故障处理:消息堆积、Broker 宕机
完成本教程后,你应该能够在生产环境中部署和管理 RocketMQ 消息队列。