第三章:核心概念¶
消息模型¶
┌─────────────────────────────────────────────────────────────┐
│ RocketMQ 消息模型 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Producer │
│ │ │
│ │ 发送消息 │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Topic │ │
│ │ ┌─────────────────────────────────────────────┐ │ │
│ │ │ Message Queue 0 │ │ │
│ │ │ ┌─────┬─────┬─────┬─────┬─────┐ │ │ │
│ │ │ │ M1 │ M2 │ M3 │ M4 │ M5 │ │ │ │
│ │ │ └─────┴─────┴─────┴─────┴─────┘ │ │ │
│ │ └─────────────────────────────────────────────┘ │ │
│ │ ┌─────────────────────────────────────────────┐ │ │
│ │ │ Message Queue 1 │ │ │
│ │ │ ┌─────┬─────┬─────┬─────┬─────┐ │ │ │
│ │ │ │ M6 │ M7 │ M8 │ M9 │ M10 │ │ │ │
│ │ │ └─────┴─────┴─────┴─────┴─────┘ │ │ │
│ │ └─────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ Consumer Group │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Consumer 1 │ │ Consumer 2 │ │ Consumer 3 │ │
│ │ (Queue 0) │ │ (Queue 1) │ │ (Queue 2) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
Topic 与 Queue¶
Topic¶
Topic 是消息主题,消息的第一级分类。
// 创建 Topic
DefaultMQAdminExt admin = new DefaultMQAdminExt();
admin.start();
admin.createTopic("TopicTest", 4, 2); // Topic名称, Queue数量, 副本数
Message Queue¶
Queue 是消息队列,消息的第二级分类。
Topic: OrderTopic
├── Queue 0: 订单消息分区 0
├── Queue 1: 订单消息分区 1
├── Queue 2: 订单消息分区 2
└── Queue 3: 订单消息分区 3
消息结构¶
消息属性¶
Message msg = new Message();
// 必需属性
msg.setTopic("TopicTest"); // 主题
msg.setBody("Hello RocketMQ".getBytes()); // 消息体
// 可选属性
msg.setTags("TagA"); // 标签
msg.setKeys("OrderID_123"); // 业务键
msg.setDelayTimeLevel(1); // 延迟级别
msg.setWaitStoreMsgOK(true); // 是否等待存储成功
消息 Key¶
// 设置唯一 Key,用于查询
msg.setKeys("OrderID_" + orderId);
// 根据 Key 查询消息
QueryResult result = admin.queryMessage("TopicTest", "OrderID_123", 10, 0, System.currentTimeMillis());
消费者组¶
集群模式¶
同一消费者组内,每条消息只会被一个消费者消费。
Consumer Group: OrderConsumerGroup
├── Consumer 1: 消费 Queue 0, Queue 1
├── Consumer 2: 消费 Queue 2, Queue 3
└── Consumer 3: 备用(负载均衡)
广播模式¶
同一消费者组内,每条消息会被所有消费者消费。
消息过滤¶
Tag 过滤¶
// 发送消息时设置 Tag
Message msg = new Message("TopicTest", "TagA", "Hello".getBytes());
// 消费时订阅指定 Tag
consumer.subscribe("TopicTest", "TagA || TagB");
SQL 过滤¶
// 发送消息时设置属性
Message msg = new Message("TopicTest", "Hello".getBytes());
msg.putUserProperty("age", "18");
msg.putUserProperty("level", "VIP");
// 消费时使用 SQL 过滤
consumer.subscribe("TopicTest",
MessageSelector.bySql("age > 16 AND level = 'VIP'"));
消息轨迹¶
// 开启消息轨迹
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup", true);
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup", true);
// 查询消息轨迹
TraceView trace = admin.viewMessageTrace("msgId");
小结¶
核心概念要点:
- Topic:消息主题,第一级分类
- Queue:消息队列,第二级分类
- 消费模式:集群模式、广播模式
- 消息过滤:Tag 过滤、SQL 过滤
下一章我们将学习生产者与消费者。