跳转至

第三章:核心概念

消息模型

┌─────────────────────────────────────────────────────────────┐
│                   RocketMQ 消息模型                          │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  Producer                                                   │
│     │                                                       │
│     │ 发送消息                                              │
│     ▼                                                       │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                      Topic                          │   │
│  │  ┌─────────────────────────────────────────────┐   │   │
│  │  │              Message Queue 0                 │   │   │
│  │  │  ┌─────┬─────┬─────┬─────┬─────┐           │   │   │
│  │  │  │ M1  │ M2  │ M3  │ M4  │ M5  │           │   │   │
│  │  │  └─────┴─────┴─────┴─────┴─────┘           │   │   │
│  │  └─────────────────────────────────────────────┘   │   │
│  │  ┌─────────────────────────────────────────────┐   │   │
│  │  │              Message Queue 1                 │   │   │
│  │  │  ┌─────┬─────┬─────┬─────┬─────┐           │   │   │
│  │  │  │ M6  │ M7  │ M8  │ M9  │ M10 │           │   │   │
│  │  │  └─────┴─────┴─────┴─────┴─────┘           │   │   │
│  │  └─────────────────────────────────────────────┘   │   │
│  └─────────────────────────────────────────────────────┘   │
│     │                                                       │
│     ▼                                                       │
│  Consumer Group                                             │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐     │
│  │  Consumer 1  │  │  Consumer 2  │  │  Consumer 3  │     │
│  │  (Queue 0)   │  │  (Queue 1)   │  │  (Queue 2)   │     │
│  └──────────────┘  └──────────────┘  └──────────────┘     │
│                                                             │
└─────────────────────────────────────────────────────────────┘

Topic 与 Queue

Topic

Topic 是消息主题,消息的第一级分类。

// 创建 Topic
DefaultMQAdminExt admin = new DefaultMQAdminExt();
admin.start();

admin.createTopic("TopicTest", 4, 2);  // Topic名称, Queue数量, 副本数

Message Queue

Queue 是消息队列,消息的第二级分类。

Topic: OrderTopic
├── Queue 0: 订单消息分区 0
├── Queue 1: 订单消息分区 1
├── Queue 2: 订单消息分区 2
└── Queue 3: 订单消息分区 3

消息结构

消息属性

Message msg = new Message();

// 必需属性
msg.setTopic("TopicTest");           // 主题
msg.setBody("Hello RocketMQ".getBytes());  // 消息体

// 可选属性
msg.setTags("TagA");                 // 标签
msg.setKeys("OrderID_123");          // 业务键
msg.setDelayTimeLevel(1);            // 延迟级别
msg.setWaitStoreMsgOK(true);         // 是否等待存储成功

消息 Key

// 设置唯一 Key,用于查询
msg.setKeys("OrderID_" + orderId);

// 根据 Key 查询消息
QueryResult result = admin.queryMessage("TopicTest", "OrderID_123", 10, 0, System.currentTimeMillis());

消费者组

集群模式

同一消费者组内,每条消息只会被一个消费者消费。

Consumer Group: OrderConsumerGroup
├── Consumer 1: 消费 Queue 0, Queue 1
├── Consumer 2: 消费 Queue 2, Queue 3
└── Consumer 3: 备用(负载均衡)

广播模式

同一消费者组内,每条消息会被所有消费者消费。

consumer.setMessageModel(MessageModel.BROADCASTING);

消息过滤

Tag 过滤

// 发送消息时设置 Tag
Message msg = new Message("TopicTest", "TagA", "Hello".getBytes());

// 消费时订阅指定 Tag
consumer.subscribe("TopicTest", "TagA || TagB");

SQL 过滤

// 发送消息时设置属性
Message msg = new Message("TopicTest", "Hello".getBytes());
msg.putUserProperty("age", "18");
msg.putUserProperty("level", "VIP");

// 消费时使用 SQL 过滤
consumer.subscribe("TopicTest", 
    MessageSelector.bySql("age > 16 AND level = 'VIP'"));

消息轨迹

// 开启消息轨迹
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup", true);
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup", true);

// 查询消息轨迹
TraceView trace = admin.viewMessageTrace("msgId");

小结

核心概念要点:

  • Topic:消息主题,第一级分类
  • Queue:消息队列,第二级分类
  • 消费模式:集群模式、广播模式
  • 消息过滤:Tag 过滤、SQL 过滤

下一章我们将学习生产者与消费者。