RocketMQ
# RocketMQ消息队列
# 1.消息队列介绍
# 1.1 什么是消息队列?
消息队列(Message Queue,MQ)是一种异步通信机制,它允许不同的系统或组件之间通过发送、存储和消费消息来进行数据交互。消息通常以**FIFO(先进先出)**的方式存储在队列中,消费者按照顺序处理消息。
消息队列一般由三部分组成:
- 生产者(Producer):发送消息到队列的组件。
- 消息队列(Message Queue):临时存储消息,确保消息可以被消费。
- 消费者(Consumer):从队列中取出消息并处理的组件。
常见的消息队列中间件包括:
- Kafka(大数据处理、日志收集)
- RabbitMQ(支持多种协议,适用于企业级应用)
- RocketMQ(高吞吐、高可用,适用于分布式系统)
- ActiveMQ(较早的消息中间件,支持 JMS)
# 1.2 为什么要用消息队列?
# 解耦(Decoupling)
- 在传统系统中,服务 A 需要调用服务 B,通常是直接调用(同步调用)。
- 如果 B 发生故障或变更,A 也会受到影响。
- 使用消息队列后,A 只需要把消息放入队列,B 什么时候处理都可以,二者可以独立演进。
# 削峰填谷(Traffic Shaping)
- 业务高峰期时,短时间内大量请求可能会压垮系统。
- 消息队列可以缓冲请求,让消费者按照自身能力逐步处理,避免系统崩溃。
# 异步处理(Asynchronous Processing)
- 例如,用户下单后,订单服务需要通知库存、支付等多个服务。
- 直接调用会导致用户等待时间过长,而使用消息队列可以异步执行这些任务,提升响应速度。
# 1.3 什么时候不适合使用消息队列?
- 强一致性需求:消息队列是最终一致性,不适用于需要强一致性的场景(如银行转账)。
- 低延迟业务:如果业务场景要求毫秒级实时响应,MQ 引入的异步机制可能会增加延迟。
- 系统复杂度:引入消息队列后,系统需要额外的监控、管理和异常处理机制,增加运维成本。
# 2. RocketMQ 介绍
RocketMQ 是由阿里巴巴开源的分布式消息中间件,具有低延迟、高吞吐量、高可用性和高可靠性的特点,适用于海量消息的堆积和异步解耦的应用场景。
# 2.1 物理
- 生产者(Producer):消息的发布者,负责构建并传输消息到 RocketMQ 服务器。
- 主题(Topic):RocketMQ 消息的逻辑分类,用于区分不同业务场景的消息流。
- 消息队列(MessageQueue):消息的实际存储单元,每个 Topic 由多个队列组成,以支持高并发读写。
- 消费者(Consumer):消息的订阅者,负责接收并处理从 Broker 获取的消息。
- 消费者组(ConsumerGroup):一组具有相同逻辑的消费者实例,用于负载均衡消费消息。
- NameServer:类似于注册中心,负责管理和发现 Broker 服务,多个 NameServer 之间相互独立,无状态运行。
- Broker:消息的存储和转发节点,接收生产者发送的消息并持久化,消费者从 Broker 拉取消息进行处理。
# 2.2 消息模型
# 2.3 部署模型
# 3. RocketMQ单机版快速搭建
# 3.1 环境要求
Java 8 或以上版本:确保你的系统已安装 Java 8 或更高版本,因为 RocketMQ 依赖于 Java 运行时环境。
RocketMQ 官方发布的单机版:下载并安装适用于你操作系统的 RocketMQ 版本。可以从 RocketMQ 官方网站或 GitHub 页面获取最新的发布版本。
推荐的操作系统:推荐在 Linux 系统上部署,因为其稳定性和性能较优。
注意:如果你在虚拟机中运行 RocketMQ,且虚拟机内存较小,可能会导致 RocketMQ 启动失败或运行不稳定。此时需要调整一些配置项来优化性能,使其能够在有限的资源下正常运行。例如:
# 3.2 RocketMQ工作原理
# 3.3 NameServer服务搭建
启动 NameServer 服务非常简单。RocketMQ 提供了一个名为 mqnamesrv
的启动脚本,我们只需执行它即可启动 NameServer 服务。然而,默认情况下,RocketMQ 为 JVM 设置的内存大小为 4GB,这对许多虚拟机而言可能会过大,因此我们需要调整 JVM 内存配置,以确保在内存较小的环境中能够正常运行。
# 3.3.1. 修改NameServer启动配置
进入 RocketMQ 安装目录的 bin
文件夹打开 runserver.sh
脚本,修改 JVM 内存配置:
vi runserver.sh
这样修改后,JVM 会使用较少的内存,适用于内存较小的虚拟机。
# 3.3.2. 启动 NameServer
修改完启动配置后,我们可以通过 静默启动 方式启动 NameServer 服务。
nohup
命令是用于在后台运行命令,并且即使当前会话关闭,程序仍会继续运行。&
表示将进程放入后台执行,避免阻塞当前终端。执行以下命令启动 NameServer:nohup ./mqnamesrv &
1nohup
:表示忽略挂起信号,使得程序在终端关闭后仍然运行。./mqnamesrv
:执行启动 NameServer 的脚本。&
:将进程放入后台运行,释放终端。
启动后,我们可以通过查看日志文件来确认 NameServer 是否启动成功。
tail -f
命令用于实时查看日志内容,-f
参数表示持续跟踪文件的末尾内容,直到手动停止。使用以下命令查看nohup.out
文件中的日志输出:tail -f nohup.out
1tail
:用于显示文件的末尾部分。-f
:表示持续跟踪文件变化,适合用来实时查看日志。
如果 NameServer 启动成功,在
nohup.out
中,你会看到类似以下的日志信息,表示服务已成功启动:该日志信息确认了 NameServer 已经启动并且运行正常。你可以在
/logs/rocketmqlogs
目录下查看详细的日志文件,进一步检查运行状态。
# 3.4 Broker服务搭建
启动 Broker 服务的脚本是 runbroker.sh
。默认情况下,Broker 的内存配置为 8GB。如果你的系统内存不足以满足该要求,必须调整 JVM 内存配置,避免启动失败。下面将介绍如何修改配置文件以及启动 Broker 服务。
# 3.4.1. 修改 Broker 启动脚本配置
进入到 bin
目录,编辑 runbroker.sh
启动脚本,修改默认的 JVM 内存配置:
vi runbroker.sh
这样修改后,Broker 将使用更少的内存来启动,适合内存较小的虚拟机环境。
# 3.4.2. 修改 Broker 配置文件
进入 conf
目录,编辑 broker.conf
配置文件:
cd ../conf/
vi broker.conf
2
这将使 Broker 自动创建 Topic,并将其与指定的 NameServer(localhost:9876
)进行通信。
# 3.4.3. 启动 Broker 服务
配置完成后,回到
bin
目录,使用静默启动方式启动 Broker 服务:cd ../bin nohup ./mqbroker -c ../conf/broker.conf &
1
2nohup
:表示即使关闭终端,Broker 服务也会继续在后台运行。./mqbroker -c ../conf/broker.conf
:启动 Broker 服务并加载刚才修改的配置文件。
启动后,可以通过查看
nohup.out
文件中的日志来确认 Broker 是否启动成功:tail -f nohup.out
1如果 Broker 启动成功,你将看到类似以下的日志,确认 Broker 已成功注册到 NameServer:
每次启动 Broker 时,必须确保 Broker 已成功注册到目标 NameServer 上,否则消息可能无法正常路由。如果启动成功,你将看到一个类似以下的进程信息
# 3.5 测试 RocketMQ 消息发送与消费
RocketMQ 安装包中提供了一个 tools.sh
工具,可以用来在命令行快速验证 RocketMQ 服务的消息发送与消费功能。下面是如何使用该工具进行测试的步骤。
# 3.51 测试消息发送
首先,我们需要在 bin
目录下执行以下命令来测试消息发送。默认情况下,消息生产者会发送 1000 条消息,发送完成后会自动关闭。
export NAMESRV_ADDR='localhost:9876'
./tools.sh org.apache.rocketmq.example.quickstart.Producer
2
export NAMESRV_ADDR='localhost:9876'
:设置 NameServer 地址,这里假设 RocketMQ 的 NameServer 运行在localhost:9876
。./tools.sh org.apache.rocketmq.example.quickstart.Producer
:运行 Producer 示例,开始发送消息。
执行该命令后,如果消息发送成功,你应该看到以下提示:
如果没有错误信息,且显示发送了指定的消息数量(如 1000 条),则表示消息发送成功。
# 3.5.2 测试消息接收
接下来,我们可以测试消息消费者。消费者执行时会一直挂起,等待新的消息到来。执行以下命令启动消费者:
sh ./tools.sh org.apache.rocketmq.example.quickstart.Consumer
./tools.sh org.apache.rocketmq.example.quickstart.Consumer
:运行 Consumer 示例,开始接收消息。
消费者会持续运行,直到你手动停止它。在成功接收到消息时,你应该看到以下提示,表示消息接收成功:
这样就完成了消息的发送与接收测试。如果发送的消息能够成功被消费者接收,说明 RocketMQ 服务已经正常运行。
# 4. 发送消息确认机制
RocketMQ 提供三种主要的消息发送方式:异步发送、同步发送 和 异步事件监听。不同的方式在性能、可靠性和适用场景上有所不同。
# 1.1. 异步发送(高性能,可能消息丢失)
特点:
- 生产者发送消息后 不等待返回结果,直接返回,提高吞吐量。
- 可能发生消息丢失,因为生产者无法确认消息是否成功存储到 Broker。
- 适用于 对可靠性要求不高,但 吞吐量要求极高 的业务场景(如日志收集、监控数据上报)。
代码示例:
DefaultMQProducer producer = new DefaultMQProducer("async_producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 10; i++) {
Message msg = new Message("TestTopic", "TagA", ("Hello RocketMQ " + i).getBytes());
producer.sendOneway(msg); // 发送后不管返回结果
}
producer.shutdown();
2
3
4
5
6
7
8
9
# 1.2. 同步发送(可靠性高,适合重要消息)
特点:
- 生产者 发送消息后等待响应,直到 Broker 确认存储成功。
- 返回消息存储的具体信息(Broker、MessageQueue、偏移量)。
- 可靠性高,适用于 金融支付、订单系统 等要求 消息不丢失 的场景。
- 由于需要等待 Broker 确认,吞吐量相对较低。
代码示例:
DefaultMQProducer producer = new DefaultMQProducer("sync_producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 10; i++) {
Message msg = new Message("TestTopic", "TagB", ("Hello RocketMQ " + i).getBytes());
SendResult sendResult = producer.send(msg); // 发送同步消息
System.out.println("SendResult: " + sendResult);
}
producer.shutdown();
2
3
4
5
6
7
8
9
10
# 1.3. 异步事件监听(高可用,但需注意主线程存活)
特点:
- 不阻塞主线程,消息发送后 提供回调函数 处理结果。
- 适用于 延迟允许的业务场景(如邮件通知、短信发送等)。
- 问题点:如果主线程
shutdown
过早,监听器也会被销毁,影响回调的执行。 - 解决方案:主线程睡眠 或 使用 CountDownLatch 保证回调执行完成。
代码示例:
DefaultMQProducer producer = new DefaultMQProducer("async_producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 10; i++) {
Message msg = new Message("TestTopic", "TagC", ("Hello RocketMQ " + i).getBytes());
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("Send Success: " + sendResult);
}
@Override
public void onException(Throwable e) {
System.err.println("Send Failed: " + e.getMessage());
}
});
}
Thread.sleep(5000); // 让主线程保持存活,确保回调执行
producer.shutdown();
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 1.4 broker如何知道consumer处理成功没
- 消费者会监听消息队列,并 主动反馈消费状态(成功或失败)。
- 消费成功:RocketMQ 会更新
queue
指针,标记该消息已被消费。 - 消费失败:如果消费者返回失败,RocketMQ 可能会 重新投递 该消息(取决于消费模式)。
代码示例:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TestTopic", "*");
consumer.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context) -> {
for (MessageExt msg : msgs) {
System.out.println("Received Message: " + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 确认消费成功
});
consumer.start();
2
3
4
5
6
7
8
9
10
11
# 5. RocketMQ 广播模式
RocketMQ 提供两种消费模式:
- 集群模式(默认):消息 负载均衡,每条消息 只会被其中一个消费者消费一次。
- 广播模式:每个消费者 都会收到所有消息,适用于 需要所有节点都处理相同消息 的场景。
# 5.1. 如何开启广播模式?
只需在 Consumer
端配置 setMessageModel(MessageModel.BROADCASTING)
即可。
代码示例:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("broadcast_consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TestTopic", "*");
// 设置广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context) -> {
for (MessageExt msg : msgs) {
System.out.println("Received Broadcast Message: " + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
2
3
4
5
6
7
8
9
10
11
12
13
14
# 5.2. 消费者位点的维护
在 集群模式 下,RocketMQ 会维护 消费进度(offset),防止重复消费。
- 消费成功:更新 offset,确保下次只拉取新消息。
- 消费失败:可能重新投递。
在 广播模式 下,RocketMQ 不会维护消费位点,每次消费者启动都会 重新拉取所有消息。
# 6. 过滤消息
在同一个 Topic 下,可能包含多种不同类型的消息,而消费者可能只希望关注其中的一部分,例如:
- 订单消息(创建、支付、取消等)。
- 库存消息(增加、减少等)。
RocketMQ 提供了三种消息过滤方式:
# 6.1 Tag 过滤(默认支持,推荐)
- 生产者:发送消息时附加 Tag。
- 消费者:订阅时指定 Tag,只有匹配的消息才会被消费。
- 特点:
- 性能最高,直接在 Broker 端进行筛选。
- 适用于大多数业务场景。
示例:
生产者发送带有
TagA
1的消息:
Message msg = new Message("OrderTopic", "TagA", "Hello RocketMQ".getBytes()); producer.send(msg);
1
2消费者订阅
TagA
1相关消息:
consumer.subscribe("OrderTopic", "TagA");
1
# 6.2 SQL92 过滤(支持复杂逻辑)
- 生产者:发送消息时附加 自定义属性(key-value)。
- 消费者:订阅时使用 SQL92 语法筛选消息。
- 特点:
- 支持更复杂的业务逻辑,例如
age > 18 AND status = 'active'
。 - 性能略逊于 Tag 过滤,但比 Consumer 端过滤更高效。
- 支持更复杂的业务逻辑,例如
示例:
生产者发送带有属性的消息:
Message msg = new Message("UserTopic", "TagB", "Hello SQL".getBytes()); msg.putUserProperty("age", "25"); producer.send(msg);
1
2
3消费者使用 SQL92 过滤
age > 20
1的消息:
consumer.subscribe("UserTopic", MessageSelector.bySql("age > 20"));
1
# 6.3 Consumer 端过滤(不推荐)
- 生产者:不进行额外处理,发送所有消息。
- 消费者:收到所有消息后,自行筛选。
- 特点:
- 占用带宽和计算资源,因为所有消息都会被推送到 Consumer。
- 性能最差,仅在 Broker 无法满足需求时使用。
示例:
消费者手动过滤
TagC
1的消息:
consumer.subscribe("InventoryTopic", "*"); // 订阅所有消息 consumer.registerMessageListener((msgs, context) -> { for (MessageExt msg : msgs) { if ("TagC".equals(msg.getTags())) { // 处理 TagC 消息 } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; });
1
2
3
4
5
6
7
8
9
# 6.4 Broker 端 vs. Consumer 端过滤
- Broker 端过滤(Tag/SQL92):
- 仅推送消费者感兴趣的消息,减少网络带宽消耗。
- Consumer 端过滤:
- 需要接收所有消息,然后自行筛选,占用更多带宽和计算资源。
- 不推荐,除非 Broker 端过滤无法满足需求。。
# 7. 顺序消息机制
在电商业务中,如 下订单 → 锁库存 → 支付 → 发货,这些操作必须按照严格的顺序执行,否则可能导致数据不一致。RocketMQ 提供两种顺序消息模式:
- 全局顺序(Global Order):整个 Topic 内的消息必须严格按照生产顺序消费,但吞吐量低,不推荐使用。
- 分区顺序(Partition Order,常用):在 同一个队列(Queue)内,消息按顺序消费,但不同队列之间的消息无序。
# 7.1 生产者端保证顺序
由于 Broker 和 Consumer 受网络波动影响,不能保证顺序,因此顺序需要 由 Producer 端保证,做法是:
- 将相同业务主键(如订单 ID) 的消息发往同一个队列(Queue),利用 FIFO(先进先出)特性保证顺序。
- 通过 MessageQueueSelector 选择队列,示例如下:
// 发送顺序消息
SendResult sendResult = producer.send(
new Message("OrderTopic", "order_123", "订单创建".getBytes()),
(mqs, msg, arg) -> {
int queueIndex = arg.hashCode() % mqs.size(); // 按业务主键(订单 ID)选择队列
return mqs.get(queueIndex);
},
orderId
);
2
3
4
5
6
7
8
9
# 7.2 Consumer 端顺序消费
默认情况下,RocketMQ 的 Consumer 使用多线程并发拉取消息,这可能会导致 后续消息先被处理,破坏顺序。
- 正确做法:使用 Orderly Consumer,限制每个 Consumer 线程只能消费一个队列(Queue):
consumer.subscribe("OrderTopic", "*");
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
for (MessageExt msg : msgs) {
System.out.println("消费消息:" + new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS; // 确保消费成功
});
2
3
4
5
6
7
8
# 7.3 避免队列热点
- 如果所有消息都落入一个 Queue,会造成性能瓶颈,应尽量打散消息,均匀分布到多个 Queue 中。
- 可以采用 Hash 方式(如按订单 ID 取模)将消息均匀分布到不同队列,避免某个队列过载。
# 8. 延迟消息和批量消息
# 8.1 延迟消息(Delayed Message)
延迟消息 允许 消息发送后,延迟一定时间再被消费,适用于:定时任务(如延迟执行某个操作)订单超时取消(如 30 分钟未支付自动取消)支付超时提醒(如 5 分钟后提醒用户完成支付)
特点:
- 使用
DelayTimeLevel
指定延迟级别(不能自定义毫秒级延迟) - 只支持 18 级固定延迟级别(从 1s 到 2h)
- 基于定时轮(Time Wheel) 实现,性能高效
- 延迟时间到达后,消息进入正常消费队列,消费者按普通消息处理
使用示例
1. 生产者发送延迟消息
Message msg = new Message("OrderTimeoutTopic", "订单超时".getBytes());
// 设置延迟级别(这里使用 3,表示 10 秒后消费)
msg.setDelayTimeLevel(3);
SendResult sendResult = producer.send(msg);
System.out.println("消息发送成功:" + sendResult);
2
3
4
5
6
2. 消费者按普通方式消费
consumer.subscribe("OrderTimeoutTopic", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
System.out.println("收到延迟消息:" + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
2
3
4
5
6
7
8
# 8.2 批量消息(Batch Message)
批量消息 用于 减少网络传输次数,提高吞吐量,适用于: 批量日志处理 、批量订单创建、 批量库存变更
特点
- Producer 可以一次性发送多条消息(
List<Message>
) - Consumer 端按普通方式消费,无需特殊处理
- 批量消息不能超过 4MB(默认限制),否则需要分批发送
使用示例
1. 生产者批量发送消息
List<Message> messages = new ArrayList<>();
messages.add(new Message("BatchTopic", "TagA", "批量消息1".getBytes()));
messages.add(new Message("BatchTopic", "TagB", "批量消息2".getBytes()));
SendResult sendResult = producer.send(messages);
System.out.println("批量消息发送成功:" + sendResult);
2
3
4
5
6
2. 消费者按普通方式消费
consumer.subscribe("BatchTopic", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
System.out.println("收到批量消息:" + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
2
3
4
5
6
7
8
# 9. RocketMQ 事务消息
事务消息(Transactional Message) 适用于 分布式事务 场景,保证跨服务的一致性。它允许 生产者 在消息发送后,根据业务逻辑 决定提交或回滚消息,从而确保数据和消息的最终一致性。
# 9.1 为什么需要事务消息?
在分布式系统中,多个服务间的数据操作 不能使用本地事务(如数据库事务),否则会造成数据不一致。例如:
🔹 订单系统 和 支付系统 需要 保证支付成功后才更新订单状态。
🔹 库存扣减 需要确保 订单创建成功后才执行,否则可能库存错乱。
传统的分布式事务方案:
方案 | 方式 | 缺点 |
---|---|---|
XA 两阶段提交 | 使用数据库锁保证事务一致性 | 性能差、数据库压力大 |
TCC(Try-Confirm-Cancel) | 预留资源,确认或回滚 | 业务侵入性强 |
最终一致性(RocketMQ 事务消息) | 先发送消息,业务执行后确认 | 高性能、低耦合 |
✅ RocketMQ 事务消息基于“消息 + 事务检查”机制,保证最终一致性,且性能高!
# 9.2 RocketMQ 事务消息的执行流程
RocketMQ 事务消息 采用两阶段提交(2PC)机制,但不会锁数据库,而是使用 回查(Checkback)机制 解决不确定状态。
🚀 事务消息的两阶段流程:
- 第一阶段:发送半消息(Half Message)
- 生产者发送事务消息,但 消息不会被消费者消费,而是进入 “待确认”状态。
- 生产者执行本地事务逻辑(如更新数据库)。
- 第二阶段:提交或回滚
- 如果本地事务成功:生产者 提交消息,消息才会被消费者消费。
- 如果本地事务失败:生产者 回滚消息,消息被丢弃,不会被消费。
- 事务回查(Transaction Check)
- 如果生产者 宕机或网络异常,导致消息未提交,RocketMQ 会回查事务状态。
- 生产者重新检查本地事务状态,决定 提交或回滚。
# 9.3 RocketMQ 事务消息示例
# (1)生产者发送事务消息
生产者使用 sendMessageInTransaction()
发送事务消息,并实现 TransactionListener
处理事务逻辑。
public class TransactionProducer {
public static void main(String[] args) throws Exception {
// 1. 创建生产者
TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group");
producer.setNamesrvAddr("localhost:9876");
// 2. 设置事务监听器(处理事务逻辑)
producer.setTransactionListener(new TransactionListener() {
// 执行本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 执行业务逻辑(如写入数据库)
System.out.println("执行本地事务:" + new String(msg.getBody()));
boolean success = doLocalTransaction();
return success ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;
} catch (Exception e) {
return LocalTransactionState.UNKNOW;
}
}
// MQ 服务器回查事务状态
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("MQ回查事务状态:" + new String(msg.getBody()));
return checkTransactionStatus() ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;
}
});
producer.start();
// 3. 发送事务消息
Message msg = new Message("TransactionTopic", "Hello Transaction".getBytes());
SendResult result = producer.sendMessageInTransaction(msg, null);
System.out.println("消息发送状态:" + result);
producer.shutdown();
}
// 模拟本地事务
private static boolean doLocalTransaction() {
return Math.random() > 0.5; // 随机成功或失败
}
// 模拟事务回查
private static boolean checkTransactionStatus() {
return true; // 假设本地事务成功
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# (2)消费者消费事务消息
事务消息消费方式与普通消息相同,无需额外处理。
public class TransactionConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TransactionTopic", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
System.out.println("消费事务消息:" + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 9.4 事务回查机制
# 为什么需要事务回查?
如果 生产者宕机,导致事务消息卡在 “待确认”状态,RocketMQ 会回查事务状态,以防止消息丢失或重复。
# 回查逻辑
- RocketMQ 发现消息未提交,会调用
checkLocalTransaction()
方法,让生产者确认事务状态。 - 如果本地事务成功,则提交消息。
- 如果本地事务失败,则回滚消息。
# 回查的触发条件
- 生产者发送消息后 没有返回 COMMIT/ROLLBACK。
- RocketMQ 定期回查(默认 1 分钟)。
# 10 ACL 权限控制体系
ACL(Access Control List,访问控制列表)是 RocketMQ 用于 权限管理 的机制。它允许管理员 控制哪些客户端(生产者/消费者)可以访问哪些 Topic,并限制其 读写权限,从而 保护消息数据安全。
# 10.1 为什么需要 ACL?
在 金融、支付、企业级应用 等场景中,消息队列承载了关键业务数据,如果不加控制,可能出现:
🔹 未经授权的客户端 访问消息队列,导致数据泄露。
🔹 恶意客户端伪造身份,向系统发送非法消息。
🔹 不同应用之间消息隔离,防止数据混乱。
✅ ACL 机制可以保护 RocketMQ 免受未授权访问,提高数据安全性。
# 10.2 RocketMQ ACL 机制
RocketMQ 通过 配置 ACL 规则,对客户端进行身份认证和权限控制。
🔹 身份认证: 客户端必须提供 AccessKey / SecretKey 才能访问 Broker。
🔹 权限控制: 可以设定不同用户对 Topic 的 读(SUB)/写(PUB)/管理权限(ADMIN)。
🔹 ACL 配置存储 在 Broker 的 conf/acl
目录下,默认支持 明文文件 配置(也可扩展自定义认证)。
# 10.3 ACL 配置文件
RocketMQ 的 ACL 配置文件位于:
${ROCKETMQ_HOME}/conf/acl/plain_acl.yml
# (1)开启 ACL
首先,修改 Broker 配置文件 broker.conf
,启用 ACL:
aclEnable=true
然后,在 启动 Broker 时,指定 broker.conf
:
nohup sh bin/mqbroker -c conf/broker.conf &
这样,Broker 启用了 ACL 认证机制。
# (2)ACL 用户权限配置
编辑 conf/acl/plain_acl.yml
文件,添加用户权限:
# ACL 配置示例
globalWhiteRemoteAddresses:
- "192.168.1.*" # 允许该 IP 直接访问(不需要认证)
accounts:
- accessKey: "user1"
secretKey: "password1"
whiteRemoteAddress: "192.168.1.100" # 允许该 IP 访问
admin: false
defaultTopicPerm: DENY # 禁止访问默认 Topic
defaultGroupPerm: SUB # 只允许订阅
topicPerms:
"TestTopic": PUB | SUB # 允许发布和订阅 TestTopic
groupPerms:
"GID_TestGroup": SUB # 仅允许订阅
- accessKey: "admin"
secretKey: "admin123"
whiteRemoteAddress: "*"
admin: true # 最高权限,允许管理
defaultTopicPerm: PUB | SUB
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
✅ 配置解析:
- 全局白名单(globalWhiteRemoteAddresses):允许指定 IP 无需认证 直接访问。
- accounts 配置用户权限:
accessKey / secretKey
:用户身份凭证,客户端访问时必须提供。whiteRemoteAddress
:限制用户只能从特定 IP 访问。admin: true
:是否拥有管理员权限(允许管理 MQ)。defaultTopicPerm
:默认 Topic 权限(PUB
允许生产,SUB
允许消费,DENY
禁止访问)。topicPerms
:指定某些 Topic 的权限(PUB
、SUB
)。groupPerms
:指定 消费组 的权限。
# 10.4 客户端访问 RocketMQ(带 ACL 认证)
启用 ACL 后,客户端(生产者/消费者)访问 RocketMQ 必须提供 AccessKey 和 SecretKey。
# (1)生产者发送消息
// 创建 ACL 认证的 Producer
DefaultMQProducer producer = new DefaultMQProducer("TestProducerGroup");
producer.setNamesrvAddr("localhost:9876");
// 设置 ACL 访问密钥
RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials("user1", "password1"));
producer = new DefaultMQProducer("TestProducerGroup", rpcHook);
producer.start();
// 发送消息
Message msg = new Message("TestTopic", "Hello ACL".getBytes());
producer.send(msg);
producer.shutdown();
2
3
4
5
6
7
8
9
10
11
12
13
14
✅ 关键点:
AclClientRPCHook
负责身份认证,提供 AccessKey / SecretKey。
# (2)消费者订阅消息
// 创建 ACL 认证的 Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GID_TestGroup",
new AclClientRPCHook(new SessionCredentials("user1", "password1")));
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TestTopic", "*");
// 监听消息
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
System.out.println("消费消息:" + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
✅ 消费者必须提供 AccessKey / SecretKey,否则 Broker 拒绝访问!
# 10.5 ACL 认证失败 & 排查
如果 客户端没有正确配置 AccessKey / SecretKey,会报如下错误:
org.apache.rocketmq.remoting.exception.RemotingConnectException: no permission
✅ 解决方案:
- 确认 ACL 配置文件
plain_acl.yml
中的 accessKey、secretKey 是否正确。 - 客户端
AclClientRPCHook
是否正确设置凭证。 - 检查
broker.conf
是否开启aclEnable=true
。 - 检查 RocketMQ 日志
/logs/rocketmqlogs/broker.log
了解具体错误信息。
# 11. RocketMQ面试题
# 11.1 RocketMQ如何保证消息不丢失
RocketMQ的消息流程可以分为生产阶段、存储阶段和消费阶段,每个阶段都有可能出现消息丢失的风险。
- 生产阶段:Producer 发送消息到 Broker 可能因网络故障等原因丢失。
- 存储阶段:Broker 先将消息存入内存,再根据刷盘策略持久化到磁盘。如果在持久化之前宕机,消息可能丢失。
- 消费阶段:如果消费者在消费消息前提交了ACK,但消费过程中出现异常,消息就会丢失。
解决方案:
- 生产阶段:采用同步发送 + 失败重试机制;异步发送时重写回调方法检查发送结果;Ack 确认机制。
- 存储阶段:使用同步刷盘机制;在集群模式下采用同步复制。
- 消费阶段:确保正常消费后才提交 ACK;处理异常时返回重试标识。
此外,为了确保消息的顺序消费,需要保证消息的顺序投递和消费逻辑。
# 11.2 RocketMQ的消息持久化机制
RocketMQ 的持久化机制保证消息的可靠存储,主要依赖以下三大组件:
- CommitLog:存储所有消息的文件,采用顺序写方式,每个文件大小固定为 1GB,写满后创建新文件。
- ConsumeQueue:类似数据库索引,为每个主题的消息队列存储消息偏移量(offset)及其在 CommitLog 中的位置信息,加速消费。
- IndexFile:存储消息的 key 和 offset 映射关系,便于快速检索。每个 IndexFile 文件固定 400MB,可存储约 2000 万条索引。
消息存储流程:
- Producer 发送消息,Broker 接收到消息后先存入内存缓冲区。
- 当达到一定数据量或时间阈值时,RocketMQ 执行批量刷盘(可选同步或异步刷盘)。
- 消息写入 CommitLog,同时生成 ConsumeQueue 和 IndexFile 索引数据。
- Consumer 通过 ConsumeQueue 或 IndexFile 查询 CommitLog,读取消息并消费。
# 11.3 RocketMQ如何保证消息顺序
RocketMQ 本身不保证全局消息有序,但通过 API 可实现局部有序消费:
- MessageQueueSelector 选择消息队列
Producer 通过特定规则(如订单 ID)将相关消息发送到同一队列,保证该队列内的消息顺序。 - MessageListenerOrderly 顺序消费
消费者使用MessageListenerOrderly
接口,确保一个队列中的消息按顺序消费。
实现方式:- 一个队列同时只能被一个消费者消费(加锁 + 定时任务续锁)。
- 如果消费失败,RocketMQ 提供重试机制,确保消息不会被跳过。
- 其他方式:
- Pull模式下消费者手动保证顺序。
- 并发消费时限制线程数,确保同一队列由单线程消费。
# 11.4 RocketMQ的事务消息原理
RocketMQ 通过两阶段提交 + 事务回查机制,保证事务消息的可靠性。
执行流程:
预处理阶段:Producer 发送事务消息,RocketMQ 设置状态为“Preparing”并存入消息存储库。
执行本地事务
:Producer 执行本地事务逻辑,并返回事务状态:
- Commit:确认提交,消息可被消费。
- Rollback:回滚事务,消息不会被消费。
二次确认阶段:Broker 根据本地事务结果更新消息状态(Committing 或 Rollback)。
事务回查机制:如果网络异常或 Producer 宕机,RocketMQ 触发事务回查,调用 Producer 提供的回查接口确认事务状态,确保消息一致性。
总结 RocketMQ 通过多阶段机制确保消息的可靠性和顺序性,在分布式环境下仍能保证高效稳定的消息投递。