Yang's blog Yang's blog
首页
Java
密码学
机器学习
命令手册
关于
友链
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

xiaoyang

编程爱好者
首页
Java
密码学
机器学习
命令手册
关于
友链
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • SpringCloud

    • 微服务架构介绍
    • SpringCloud介绍
    • Spring Cloud:生产者与消费者
    • Spring Cloud Eureka:构建可靠的服务注册与发现
    • Spring Cloud Ribbon:负载均衡
    • Spring Cloud Fegin:服务调用
    • Spring Cloud Hystrix:熔断器
    • Spring Cloud Zuul:统一网关路由
    • Spring Cloud Config:配置中心
  • Java后端框架

    • LangChain4j

      • 介绍
      • 快速开始
      • Chat and Language Models
      • Chat Memory
      • Model Parameters
      • Response Streaming
      • AI Services
      • Agent
      • Tools (Function Calling)
      • RAG
      • Structured Outputs
      • Classification
      • Embedding (Vector) Stores
      • Image Models
      • Quarkus Integration
      • Spring Boot Integration
      • Kotlin Support
      • Logging
      • Observability
      • Testing and Evaluation
      • Model Context Protocol
  • 八股文

    • 操作系统
    • JVM介绍
    • Java多线程
    • Java集合框架
    • Java反射
    • JavaIO
    • Mybatis介绍
    • Spring介绍
    • SpringBoot介绍
    • Mysql
    • Redis
    • 数据结构
    • 云计算
    • 设计模式
    • 计算机网络
    • 锁核心类AQS
    • Nginx
  • 前端技术

    • 初识Vue3
    • Vue3数据双向绑定
    • Vue3生命周期
    • Vue-Router 组件
    • Pinia 集中式状态存储
  • 中间件

    • RocketMQ
      • 1.消息队列介绍
        • 1.1 什么是消息队列?
        • 1.2 为什么要用消息队列?
        • 解耦(Decoupling)
        • 削峰填谷(Traffic Shaping)
        • 异步处理(Asynchronous Processing)
        • 1.3 什么时候不适合使用消息队列?
      • 2. RocketMQ 介绍
        • 2.1 物理
        • 2.2 消息模型
        • 2.3 部署模型
      • 3. RocketMQ单机版快速搭建
        • 3.1 环境要求
        • 3.2 RocketMQ工作原理
        • 3.3 NameServer服务搭建
        • 3.3.1. 修改NameServer启动配置
        • 3.3.2. 启动 NameServer
        • 3.4 Broker服务搭建
        • 3.4.1. 修改 Broker 启动脚本配置
        • 3.4.2. 修改 Broker 配置文件
        • 3.4.3. 启动 Broker 服务
        • 3.5 测试 RocketMQ 消息发送与消费
        • 3.51 测试消息发送
        • 3.5.2 测试消息接收
      • 4. 发送消息确认机制
        • 1.1. 异步发送(高性能,可能消息丢失)
        • 1.2. 同步发送(可靠性高,适合重要消息)
        • 1.3. 异步事件监听(高可用,但需注意主线程存活)
        • 1.4 broker如何知道consumer处理成功没
      • 5. RocketMQ 广播模式
        • 5.1. 如何开启广播模式?
        • 5.2. 消费者位点的维护
      • 6. 过滤消息
        • 6.1 Tag 过滤(默认支持,推荐)
        • 6.2 SQL92 过滤(支持复杂逻辑)
        • 6.3 Consumer 端过滤(不推荐)
        • 6.4 Broker 端 vs. Consumer 端过滤
      • 7. 顺序消息机制
        • 7.1 生产者端保证顺序
        • 7.2 Consumer 端顺序消费
        • 7.3 避免队列热点
      • 8. 延迟消息和批量消息
        • 8.1 延迟消息(Delayed Message)
        • 8.2 批量消息(Batch Message)
      • 9. RocketMQ 事务消息
        • 9.1 为什么需要事务消息?
        • 9.2 RocketMQ 事务消息的执行流程
        • 9.3 RocketMQ 事务消息示例
        • (1)生产者发送事务消息
        • (2)消费者消费事务消息
        • 9.4 事务回查机制
        • 为什么需要事务回查?
        • 回查逻辑
        • 回查的触发条件
      • 10 ACL 权限控制体系
        • 10.1 为什么需要 ACL?
        • 10.2 RocketMQ ACL 机制
        • 10.3 ACL 配置文件
        • (1)开启 ACL
        • (2)ACL 用户权限配置
        • 10.4 客户端访问 RocketMQ(带 ACL 认证)
        • (1)生产者发送消息
        • (2)消费者订阅消息
        • 10.5 ACL 认证失败 & 排查
      • 11. RocketMQ面试题
        • 11.1 RocketMQ如何保证消息不丢失
        • 11.2 RocketMQ的消息持久化机制
        • 11.3 RocketMQ如何保证消息顺序
        • 11.4 RocketMQ的事务消息原理
  • 开发知识

    • 请求参数注解
    • 时间复杂度和空间复杂度
    • JSON序列化与反序列化
    • Timestamp vs Datetime
    • Java开发中必备能力单元测试
    • 正向代理和反向代理
    • 什么是VPN
    • 正则表达式
  • Java
  • 中间件
xiaoyang
2025-03-02
目录

RocketMQ

# RocketMQ消息队列

# 1.消息队列介绍

# 1.1 什么是消息队列?

消息队列(Message Queue,MQ)是一种异步通信机制,它允许不同的系统或组件之间通过发送、存储和消费消息来进行数据交互。消息通常以**FIFO(先进先出)**的方式存储在队列中,消费者按照顺序处理消息。

消息队列一般由三部分组成:

  1. 生产者(Producer):发送消息到队列的组件。
  2. 消息队列(Message Queue):临时存储消息,确保消息可以被消费。
  3. 消费者(Consumer):从队列中取出消息并处理的组件。

常见的消息队列中间件包括:

  • Kafka(大数据处理、日志收集)
  • RabbitMQ(支持多种协议,适用于企业级应用)
  • RocketMQ(高吞吐、高可用,适用于分布式系统)
  • ActiveMQ(较早的消息中间件,支持 JMS)

# 1.2 为什么要用消息队列?

# 解耦(Decoupling)

  • 在传统系统中,服务 A 需要调用服务 B,通常是直接调用(同步调用)。
  • 如果 B 发生故障或变更,A 也会受到影响。
  • 使用消息队列后,A 只需要把消息放入队列,B 什么时候处理都可以,二者可以独立演进。

image-20250302165614821

image-20250302165542388

# 削峰填谷(Traffic Shaping)

  • 业务高峰期时,短时间内大量请求可能会压垮系统。
  • 消息队列可以缓冲请求,让消费者按照自身能力逐步处理,避免系统崩溃。

image-20250302165130052

# 异步处理(Asynchronous Processing)

  • 例如,用户下单后,订单服务需要通知库存、支付等多个服务。
  • 直接调用会导致用户等待时间过长,而使用消息队列可以异步执行这些任务,提升响应速度。

image-20250302165148677

# 1.3 什么时候不适合使用消息队列?

  • 强一致性需求:消息队列是最终一致性,不适用于需要强一致性的场景(如银行转账)。
  • 低延迟业务:如果业务场景要求毫秒级实时响应,MQ 引入的异步机制可能会增加延迟。
  • 系统复杂度:引入消息队列后,系统需要额外的监控、管理和异常处理机制,增加运维成本。

# 2. RocketMQ 介绍

RocketMQ 是由阿里巴巴开源的分布式消息中间件,具有低延迟、高吞吐量、高可用性和高可靠性的特点,适用于海量消息的堆积和异步解耦的应用场景。

# 2.1 物理

  • 生产者(Producer):消息的发布者,负责构建并传输消息到 RocketMQ 服务器。
  • 主题(Topic):RocketMQ 消息的逻辑分类,用于区分不同业务场景的消息流。
  • 消息队列(MessageQueue):消息的实际存储单元,每个 Topic 由多个队列组成,以支持高并发读写。
  • 消费者(Consumer):消息的订阅者,负责接收并处理从 Broker 获取的消息。
  • 消费者组(ConsumerGroup):一组具有相同逻辑的消费者实例,用于负载均衡消费消息。
  • NameServer:类似于注册中心,负责管理和发现 Broker 服务,多个 NameServer 之间相互独立,无状态运行。
  • Broker:消息的存储和转发节点,接收生产者发送的消息并持久化,消费者从 Broker 拉取消息进行处理。

# 2.2 消息模型

image-20250302153952553

# 2.3 部署模型

image-20250302154049803

# 3. RocketMQ单机版快速搭建

# 3.1 环境要求

  • Java 8 或以上版本:确保你的系统已安装 Java 8 或更高版本,因为 RocketMQ 依赖于 Java 运行时环境。

  • RocketMQ 官方发布的单机版:下载并安装适用于你操作系统的 RocketMQ 版本。可以从 RocketMQ 官方网站或 GitHub 页面获取最新的发布版本。

  • 推荐的操作系统:推荐在 Linux 系统上部署,因为其稳定性和性能较优。

    注意:如果你在虚拟机中运行 RocketMQ,且虚拟机内存较小,可能会导致 RocketMQ 启动失败或运行不稳定。此时需要调整一些配置项来优化性能,使其能够在有限的资源下正常运行。例如:

# 3.2 RocketMQ工作原理

# 3.3 NameServer服务搭建

启动 NameServer 服务非常简单。RocketMQ 提供了一个名为 mqnamesrv 的启动脚本,我们只需执行它即可启动 NameServer 服务。然而,默认情况下,RocketMQ 为 JVM 设置的内存大小为 4GB,这对许多虚拟机而言可能会过大,因此我们需要调整 JVM 内存配置,以确保在内存较小的环境中能够正常运行。

# 3.3.1. 修改NameServer启动配置

进入 RocketMQ 安装目录的 bin 文件夹打开 runserver.sh 脚本,修改 JVM 内存配置:

vi runserver.sh
1

image-20250302155048429

这样修改后,JVM 会使用较少的内存,适用于内存较小的虚拟机。

# 3.3.2. 启动 NameServer

  1. 修改完启动配置后,我们可以通过 静默启动 方式启动 NameServer 服务。nohup 命令是用于在后台运行命令,并且即使当前会话关闭,程序仍会继续运行。& 表示将进程放入后台执行,避免阻塞当前终端。执行以下命令启动 NameServer:

    nohup ./mqnamesrv &
    
    1
    • nohup:表示忽略挂起信号,使得程序在终端关闭后仍然运行。
    • ./mqnamesrv:执行启动 NameServer 的脚本。
    • &:将进程放入后台运行,释放终端。
  2. 启动后,我们可以通过查看日志文件来确认 NameServer 是否启动成功。tail -f 命令用于实时查看日志内容,-f 参数表示持续跟踪文件的末尾内容,直到手动停止。使用以下命令查看 nohup.out 文件中的日志输出:

    tail -f nohup.out
    
    1
    • tail:用于显示文件的末尾部分。
    • -f:表示持续跟踪文件变化,适合用来实时查看日志。
  3. 如果 NameServer 启动成功,在 nohup.out 中,你会看到类似以下的日志信息,表示服务已成功启动:

    image-20250302160949257

    该日志信息确认了 NameServer 已经启动并且运行正常。你可以在 /logs/rocketmqlogs 目录下查看详细的日志文件,进一步检查运行状态。

# 3.4 Broker服务搭建

启动 Broker 服务的脚本是 runbroker.sh。默认情况下,Broker 的内存配置为 8GB。如果你的系统内存不足以满足该要求,必须调整 JVM 内存配置,避免启动失败。下面将介绍如何修改配置文件以及启动 Broker 服务。

# 3.4.1. 修改 Broker 启动脚本配置

进入到 bin 目录,编辑 runbroker.sh 启动脚本,修改默认的 JVM 内存配置:

vi runbroker.sh
1

image-20250302155453811

这样修改后,Broker 将使用更少的内存来启动,适合内存较小的虚拟机环境。

# 3.4.2. 修改 Broker 配置文件

进入 conf 目录,编辑 broker.conf 配置文件:

cd ../conf/
vi broker.conf
1
2

image-20250302160050273

这将使 Broker 自动创建 Topic,并将其与指定的 NameServer(localhost:9876)进行通信。

# 3.4.3. 启动 Broker 服务

  1. 配置完成后,回到 bin 目录,使用静默启动方式启动 Broker 服务:

    cd ../bin
    nohup ./mqbroker -c ../conf/broker.conf &
    
    1
    2
    • nohup:表示即使关闭终端,Broker 服务也会继续在后台运行。
    • ./mqbroker -c ../conf/broker.conf:启动 Broker 服务并加载刚才修改的配置文件。

    image-20250302161310285

  2. 启动后,可以通过查看 nohup.out 文件中的日志来确认 Broker 是否启动成功:

    tail -f nohup.out
    
    1

    如果 Broker 启动成功,你将看到类似以下的日志,确认 Broker 已成功注册到 NameServer:

    image-20250302161241655

    每次启动 Broker 时,必须确保 Broker 已成功注册到目标 NameServer 上,否则消息可能无法正常路由。如果启动成功,你将看到一个类似以下的进程信息

# 3.5 测试 RocketMQ 消息发送与消费

RocketMQ 安装包中提供了一个 tools.sh 工具,可以用来在命令行快速验证 RocketMQ 服务的消息发送与消费功能。下面是如何使用该工具进行测试的步骤。

# 3.51 测试消息发送

首先,我们需要在 bin 目录下执行以下命令来测试消息发送。默认情况下,消息生产者会发送 1000 条消息,发送完成后会自动关闭。

export NAMESRV_ADDR='localhost:9876'
./tools.sh org.apache.rocketmq.example.quickstart.Producer
1
2
  • export NAMESRV_ADDR='localhost:9876':设置 NameServer 地址,这里假设 RocketMQ 的 NameServer 运行在 localhost:9876。
  • ./tools.sh org.apache.rocketmq.example.quickstart.Producer:运行 Producer 示例,开始发送消息。

执行该命令后,如果消息发送成功,你应该看到以下提示:

image-20250302163053180

如果没有错误信息,且显示发送了指定的消息数量(如 1000 条),则表示消息发送成功。

# 3.5.2 测试消息接收

接下来,我们可以测试消息消费者。消费者执行时会一直挂起,等待新的消息到来。执行以下命令启动消费者:

sh ./tools.sh org.apache.rocketmq.example.quickstart.Consumer
1
  • ./tools.sh org.apache.rocketmq.example.quickstart.Consumer:运行 Consumer 示例,开始接收消息。

消费者会持续运行,直到你手动停止它。在成功接收到消息时,你应该看到以下提示,表示消息接收成功:

image-20250302163159879

这样就完成了消息的发送与接收测试。如果发送的消息能够成功被消费者接收,说明 RocketMQ 服务已经正常运行。

# 4. 发送消息确认机制

RocketMQ 提供三种主要的消息发送方式:异步发送、同步发送 和 异步事件监听。不同的方式在性能、可靠性和适用场景上有所不同。

# 1.1. 异步发送(高性能,可能消息丢失)

特点:

  • 生产者发送消息后 不等待返回结果,直接返回,提高吞吐量。
  • 可能发生消息丢失,因为生产者无法确认消息是否成功存储到 Broker。
  • 适用于 对可靠性要求不高,但 吞吐量要求极高 的业务场景(如日志收集、监控数据上报)。

代码示例:

DefaultMQProducer producer = new DefaultMQProducer("async_producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();

for (int i = 0; i < 10; i++) {
    Message msg = new Message("TestTopic", "TagA", ("Hello RocketMQ " + i).getBytes());
    producer.sendOneway(msg); // 发送后不管返回结果
}
producer.shutdown();
1
2
3
4
5
6
7
8
9

# 1.2. 同步发送(可靠性高,适合重要消息)

特点:

  • 生产者 发送消息后等待响应,直到 Broker 确认存储成功。
  • 返回消息存储的具体信息(Broker、MessageQueue、偏移量)。
  • 可靠性高,适用于 金融支付、订单系统 等要求 消息不丢失 的场景。
  • 由于需要等待 Broker 确认,吞吐量相对较低。

代码示例:

DefaultMQProducer producer = new DefaultMQProducer("sync_producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();

for (int i = 0; i < 10; i++) {
    Message msg = new Message("TestTopic", "TagB", ("Hello RocketMQ " + i).getBytes());
    SendResult sendResult = producer.send(msg); // 发送同步消息
    System.out.println("SendResult: " + sendResult);
}
producer.shutdown();
1
2
3
4
5
6
7
8
9
10

# 1.3. 异步事件监听(高可用,但需注意主线程存活)

特点:

  • 不阻塞主线程,消息发送后 提供回调函数 处理结果。
  • 适用于 延迟允许的业务场景(如邮件通知、短信发送等)。
  • 问题点:如果主线程 shutdown 过早,监听器也会被销毁,影响回调的执行。
  • 解决方案:主线程睡眠 或 使用 CountDownLatch 保证回调执行完成。

代码示例:

DefaultMQProducer producer = new DefaultMQProducer("async_producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();

for (int i = 0; i < 10; i++) {
    Message msg = new Message("TestTopic", "TagC", ("Hello RocketMQ " + i).getBytes());
    producer.send(msg, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.println("Send Success: " + sendResult);
        }

        @Override
        public void onException(Throwable e) {
            System.err.println("Send Failed: " + e.getMessage());
        }
    });
}
Thread.sleep(5000); // 让主线程保持存活,确保回调执行
producer.shutdown();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# 1.4 broker如何知道consumer处理成功没

  • 消费者会监听消息队列,并 主动反馈消费状态(成功或失败)。
  • 消费成功:RocketMQ 会更新 queue 指针,标记该消息已被消费。
  • 消费失败:如果消费者返回失败,RocketMQ 可能会 重新投递 该消息(取决于消费模式)。

代码示例:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TestTopic", "*");

consumer.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context) -> {
    for (MessageExt msg : msgs) {
        System.out.println("Received Message: " + new String(msg.getBody()));
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 确认消费成功
});
consumer.start();
1
2
3
4
5
6
7
8
9
10
11

# 5. RocketMQ 广播模式

RocketMQ 提供两种消费模式:

  1. 集群模式(默认):消息 负载均衡,每条消息 只会被其中一个消费者消费一次。
  2. 广播模式:每个消费者 都会收到所有消息,适用于 需要所有节点都处理相同消息 的场景。

# 5.1. 如何开启广播模式?

只需在 Consumer 端配置 setMessageModel(MessageModel.BROADCASTING) 即可。

代码示例:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("broadcast_consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TestTopic", "*");

// 设置广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);

consumer.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context) -> {
    for (MessageExt msg : msgs) {
        System.out.println("Received Broadcast Message: " + new String(msg.getBody()));
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
1
2
3
4
5
6
7
8
9
10
11
12
13
14

# 5.2. 消费者位点的维护

在 集群模式 下,RocketMQ 会维护 消费进度(offset),防止重复消费。

  • 消费成功:更新 offset,确保下次只拉取新消息。
  • 消费失败:可能重新投递。

在 广播模式 下,RocketMQ 不会维护消费位点,每次消费者启动都会 重新拉取所有消息。

# 6. 过滤消息

在同一个 Topic 下,可能包含多种不同类型的消息,而消费者可能只希望关注其中的一部分,例如:

  • 订单消息(创建、支付、取消等)。
  • 库存消息(增加、减少等)。

RocketMQ 提供了三种消息过滤方式:

# 6.1 Tag 过滤(默认支持,推荐)

  • 生产者:发送消息时附加 Tag。
  • 消费者:订阅时指定 Tag,只有匹配的消息才会被消费。
  • 特点:
    • 性能最高,直接在 Broker 端进行筛选。
    • 适用于大多数业务场景。

示例:

  • 生产者发送带有

    TagA
    
    1

    的消息:

    Message msg = new Message("OrderTopic", "TagA", "Hello RocketMQ".getBytes());
    producer.send(msg);
    
    1
    2
  • 消费者订阅

    TagA
    
    1

    相关消息:

    consumer.subscribe("OrderTopic", "TagA");
    
    1

# 6.2 SQL92 过滤(支持复杂逻辑)

  • 生产者:发送消息时附加 自定义属性(key-value)。
  • 消费者:订阅时使用 SQL92 语法筛选消息。
  • 特点:
    • 支持更复杂的业务逻辑,例如 age > 18 AND status = 'active'。
    • 性能略逊于 Tag 过滤,但比 Consumer 端过滤更高效。

示例:

  • 生产者发送带有属性的消息:

    Message msg = new Message("UserTopic", "TagB", "Hello SQL".getBytes());
    msg.putUserProperty("age", "25");
    producer.send(msg);
    
    1
    2
    3
  • 消费者使用 SQL92 过滤

    age > 20
    
    1

    的消息:

    consumer.subscribe("UserTopic", MessageSelector.bySql("age > 20"));
    
    1

# 6.3 Consumer 端过滤(不推荐)

  • 生产者:不进行额外处理,发送所有消息。
  • 消费者:收到所有消息后,自行筛选。
  • 特点:
    • 占用带宽和计算资源,因为所有消息都会被推送到 Consumer。
    • 性能最差,仅在 Broker 无法满足需求时使用。

示例:

  • 消费者手动过滤

    TagC
    
    1

    的消息:

    consumer.subscribe("InventoryTopic", "*"); // 订阅所有消息
    consumer.registerMessageListener((msgs, context) -> {
        for (MessageExt msg : msgs) {
            if ("TagC".equals(msg.getTags())) {
                // 处理 TagC 消息
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    });
    
    1
    2
    3
    4
    5
    6
    7
    8
    9

# 6.4 Broker 端 vs. Consumer 端过滤

  • Broker 端过滤(Tag/SQL92):
    • 仅推送消费者感兴趣的消息,减少网络带宽消耗。
  • Consumer 端过滤:
    • 需要接收所有消息,然后自行筛选,占用更多带宽和计算资源。
    • 不推荐,除非 Broker 端过滤无法满足需求。。

# 7. 顺序消息机制

在电商业务中,如 下订单 → 锁库存 → 支付 → 发货,这些操作必须按照严格的顺序执行,否则可能导致数据不一致。RocketMQ 提供两种顺序消息模式:

  1. 全局顺序(Global Order):整个 Topic 内的消息必须严格按照生产顺序消费,但吞吐量低,不推荐使用。
  2. 分区顺序(Partition Order,常用):在 同一个队列(Queue)内,消息按顺序消费,但不同队列之间的消息无序。

# 7.1 生产者端保证顺序

由于 Broker 和 Consumer 受网络波动影响,不能保证顺序,因此顺序需要 由 Producer 端保证,做法是:

  • 将相同业务主键(如订单 ID) 的消息发往同一个队列(Queue),利用 FIFO(先进先出)特性保证顺序。
  • 通过 MessageQueueSelector 选择队列,示例如下:
// 发送顺序消息
SendResult sendResult = producer.send(
    new Message("OrderTopic", "order_123", "订单创建".getBytes()),
    (mqs, msg, arg) -> {
        int queueIndex = arg.hashCode() % mqs.size(); // 按业务主键(订单 ID)选择队列
        return mqs.get(queueIndex);
    }, 
    orderId
);
1
2
3
4
5
6
7
8
9

# 7.2 Consumer 端顺序消费

默认情况下,RocketMQ 的 Consumer 使用多线程并发拉取消息,这可能会导致 后续消息先被处理,破坏顺序。

  • 正确做法:使用 Orderly Consumer,限制每个 Consumer 线程只能消费一个队列(Queue):
consumer.subscribe("OrderTopic", "*");

consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
    for (MessageExt msg : msgs) {
        System.out.println("消费消息:" + new String(msg.getBody()));
    }
    return ConsumeOrderlyStatus.SUCCESS; // 确保消费成功
});
1
2
3
4
5
6
7
8

# 7.3 避免队列热点

  • 如果所有消息都落入一个 Queue,会造成性能瓶颈,应尽量打散消息,均匀分布到多个 Queue 中。
  • 可以采用 Hash 方式(如按订单 ID 取模)将消息均匀分布到不同队列,避免某个队列过载。

# 8. 延迟消息和批量消息

# 8.1 延迟消息(Delayed Message)

延迟消息 允许 消息发送后,延迟一定时间再被消费,适用于:定时任务(如延迟执行某个操作)订单超时取消(如 30 分钟未支付自动取消)支付超时提醒(如 5 分钟后提醒用户完成支付)

特点:

  • 使用 DelayTimeLevel 指定延迟级别(不能自定义毫秒级延迟)
  • 只支持 18 级固定延迟级别(从 1s 到 2h)
  • 基于定时轮(Time Wheel) 实现,性能高效
  • 延迟时间到达后,消息进入正常消费队列,消费者按普通消息处理

使用示例

1. 生产者发送延迟消息

Message msg = new Message("OrderTimeoutTopic", "订单超时".getBytes());
// 设置延迟级别(这里使用 3,表示 10 秒后消费)
msg.setDelayTimeLevel(3);

SendResult sendResult = producer.send(msg);
System.out.println("消息发送成功:" + sendResult);
1
2
3
4
5
6

2. 消费者按普通方式消费

consumer.subscribe("OrderTimeoutTopic", "*");

consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
    for (MessageExt msg : msgs) {
        System.out.println("收到延迟消息:" + new String(msg.getBody()));
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
1
2
3
4
5
6
7
8

# 8.2 批量消息(Batch Message)

批量消息 用于 减少网络传输次数,提高吞吐量,适用于: 批量日志处理 、批量订单创建、 批量库存变更

特点

  • Producer 可以一次性发送多条消息(List<Message>)
  • Consumer 端按普通方式消费,无需特殊处理
  • 批量消息不能超过 4MB(默认限制),否则需要分批发送

使用示例

1. 生产者批量发送消息

List<Message> messages = new ArrayList<>();
messages.add(new Message("BatchTopic", "TagA", "批量消息1".getBytes()));
messages.add(new Message("BatchTopic", "TagB", "批量消息2".getBytes()));

SendResult sendResult = producer.send(messages);
System.out.println("批量消息发送成功:" + sendResult);
1
2
3
4
5
6

2. 消费者按普通方式消费

consumer.subscribe("BatchTopic", "*");

consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
    for (MessageExt msg : msgs) {
        System.out.println("收到批量消息:" + new String(msg.getBody()));
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
1
2
3
4
5
6
7
8

# 9. RocketMQ 事务消息

事务消息(Transactional Message) 适用于 分布式事务 场景,保证跨服务的一致性。它允许 生产者 在消息发送后,根据业务逻辑 决定提交或回滚消息,从而确保数据和消息的最终一致性。

# 9.1 为什么需要事务消息?

在分布式系统中,多个服务间的数据操作 不能使用本地事务(如数据库事务),否则会造成数据不一致。例如:
🔹 订单系统 和 支付系统 需要 保证支付成功后才更新订单状态。
🔹 库存扣减 需要确保 订单创建成功后才执行,否则可能库存错乱。

传统的分布式事务方案:

方案 方式 缺点
XA 两阶段提交 使用数据库锁保证事务一致性 性能差、数据库压力大
TCC(Try-Confirm-Cancel) 预留资源,确认或回滚 业务侵入性强
最终一致性(RocketMQ 事务消息) 先发送消息,业务执行后确认 高性能、低耦合

✅ RocketMQ 事务消息基于“消息 + 事务检查”机制,保证最终一致性,且性能高!

# 9.2 RocketMQ 事务消息的执行流程

RocketMQ 事务消息 采用两阶段提交(2PC)机制,但不会锁数据库,而是使用 回查(Checkback)机制 解决不确定状态。

🚀 事务消息的两阶段流程:

  1. 第一阶段:发送半消息(Half Message)
    • 生产者发送事务消息,但 消息不会被消费者消费,而是进入 “待确认”状态。
    • 生产者执行本地事务逻辑(如更新数据库)。
  2. 第二阶段:提交或回滚
    • 如果本地事务成功:生产者 提交消息,消息才会被消费者消费。
    • 如果本地事务失败:生产者 回滚消息,消息被丢弃,不会被消费。
  3. 事务回查(Transaction Check)
    • 如果生产者 宕机或网络异常,导致消息未提交,RocketMQ 会回查事务状态。
    • 生产者重新检查本地事务状态,决定 提交或回滚。

# 9.3 RocketMQ 事务消息示例

# (1)生产者发送事务消息

生产者使用 sendMessageInTransaction() 发送事务消息,并实现 TransactionListener 处理事务逻辑。

public class TransactionProducer {
    public static void main(String[] args) throws Exception {
        // 1. 创建生产者
        TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group");
        producer.setNamesrvAddr("localhost:9876");

        // 2. 设置事务监听器(处理事务逻辑)
        producer.setTransactionListener(new TransactionListener() {
            // 执行本地事务
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                try {
                    // 执行业务逻辑(如写入数据库)
                    System.out.println("执行本地事务:" + new String(msg.getBody()));
                    boolean success = doLocalTransaction();
                    return success ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;
                } catch (Exception e) {
                    return LocalTransactionState.UNKNOW;
                }
            }

            // MQ 服务器回查事务状态
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                System.out.println("MQ回查事务状态:" + new String(msg.getBody()));
                return checkTransactionStatus() ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;
            }
        });

        producer.start();

        // 3. 发送事务消息
        Message msg = new Message("TransactionTopic", "Hello Transaction".getBytes());
        SendResult result = producer.sendMessageInTransaction(msg, null);
        System.out.println("消息发送状态:" + result);

        producer.shutdown();
    }

    // 模拟本地事务
    private static boolean doLocalTransaction() {
        return Math.random() > 0.5; // 随机成功或失败
    }

    // 模拟事务回查
    private static boolean checkTransactionStatus() {
        return true; // 假设本地事务成功
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

# (2)消费者消费事务消息

事务消息消费方式与普通消息相同,无需额外处理。

public class TransactionConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_consumer_group");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("TransactionTopic", "*");

        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            for (MessageExt msg : msgs) {
                System.out.println("消费事务消息:" + new String(msg.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        consumer.start();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 9.4 事务回查机制

# 为什么需要事务回查?

如果 生产者宕机,导致事务消息卡在 “待确认”状态,RocketMQ 会回查事务状态,以防止消息丢失或重复。

# 回查逻辑

  • RocketMQ 发现消息未提交,会调用 checkLocalTransaction() 方法,让生产者确认事务状态。
  • 如果本地事务成功,则提交消息。
  • 如果本地事务失败,则回滚消息。

# 回查的触发条件

  1. 生产者发送消息后 没有返回 COMMIT/ROLLBACK。
  2. RocketMQ 定期回查(默认 1 分钟)。

# 10 ACL 权限控制体系

ACL(Access Control List,访问控制列表)是 RocketMQ 用于 权限管理 的机制。它允许管理员 控制哪些客户端(生产者/消费者)可以访问哪些 Topic,并限制其 读写权限,从而 保护消息数据安全。

# 10.1 为什么需要 ACL?

在 金融、支付、企业级应用 等场景中,消息队列承载了关键业务数据,如果不加控制,可能出现:
🔹 未经授权的客户端 访问消息队列,导致数据泄露。
🔹 恶意客户端伪造身份,向系统发送非法消息。
🔹 不同应用之间消息隔离,防止数据混乱。

✅ ACL 机制可以保护 RocketMQ 免受未授权访问,提高数据安全性。

# 10.2 RocketMQ ACL 机制

RocketMQ 通过 配置 ACL 规则,对客户端进行身份认证和权限控制。
🔹 身份认证: 客户端必须提供 AccessKey / SecretKey 才能访问 Broker。
🔹 权限控制: 可以设定不同用户对 Topic 的 读(SUB)/写(PUB)/管理权限(ADMIN)。

🔹 ACL 配置存储 在 Broker 的 conf/acl 目录下,默认支持 明文文件 配置(也可扩展自定义认证)。


# 10.3 ACL 配置文件

RocketMQ 的 ACL 配置文件位于:

${ROCKETMQ_HOME}/conf/acl/plain_acl.yml
1

# (1)开启 ACL

首先,修改 Broker 配置文件 broker.conf,启用 ACL:

aclEnable=true
1

然后,在 启动 Broker 时,指定 broker.conf:

nohup sh bin/mqbroker -c conf/broker.conf &
1

这样,Broker 启用了 ACL 认证机制。

# (2)ACL 用户权限配置

编辑 conf/acl/plain_acl.yml 文件,添加用户权限:

# ACL 配置示例
globalWhiteRemoteAddresses:
  - "192.168.1.*" # 允许该 IP 直接访问(不需要认证)

accounts:
  - accessKey: "user1"
    secretKey: "password1"
    whiteRemoteAddress: "192.168.1.100" # 允许该 IP 访问
    admin: false
    defaultTopicPerm: DENY # 禁止访问默认 Topic
    defaultGroupPerm: SUB # 只允许订阅
    topicPerms:
      "TestTopic": PUB | SUB # 允许发布和订阅 TestTopic
    groupPerms:
      "GID_TestGroup": SUB # 仅允许订阅

  - accessKey: "admin"
    secretKey: "admin123"
    whiteRemoteAddress: "*"
    admin: true # 最高权限,允许管理
    defaultTopicPerm: PUB | SUB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

✅ 配置解析:

  1. 全局白名单(globalWhiteRemoteAddresses):允许指定 IP 无需认证 直接访问。
  2. accounts 配置用户权限:
    • accessKey / secretKey:用户身份凭证,客户端访问时必须提供。
    • whiteRemoteAddress:限制用户只能从特定 IP 访问。
    • admin: true:是否拥有管理员权限(允许管理 MQ)。
    • defaultTopicPerm:默认 Topic 权限(PUB 允许生产,SUB 允许消费,DENY 禁止访问)。
    • topicPerms:指定某些 Topic 的权限(PUB、SUB)。
    • groupPerms:指定 消费组 的权限。

# 10.4 客户端访问 RocketMQ(带 ACL 认证)

启用 ACL 后,客户端(生产者/消费者)访问 RocketMQ 必须提供 AccessKey 和 SecretKey。

# (1)生产者发送消息

// 创建 ACL 认证的 Producer
DefaultMQProducer producer = new DefaultMQProducer("TestProducerGroup");
producer.setNamesrvAddr("localhost:9876");

// 设置 ACL 访问密钥
RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials("user1", "password1"));
producer = new DefaultMQProducer("TestProducerGroup", rpcHook);
producer.start();

// 发送消息
Message msg = new Message("TestTopic", "Hello ACL".getBytes());
producer.send(msg);

producer.shutdown();
1
2
3
4
5
6
7
8
9
10
11
12
13
14

✅ 关键点:

  • AclClientRPCHook 负责身份认证,提供 AccessKey / SecretKey。

# (2)消费者订阅消息

// 创建 ACL 认证的 Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GID_TestGroup", 
    new AclClientRPCHook(new SessionCredentials("user1", "password1")));

consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TestTopic", "*");

// 监听消息
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
    for (MessageExt msg : msgs) {
        System.out.println("消费消息:" + new String(msg.getBody()));
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

consumer.start();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

✅ 消费者必须提供 AccessKey / SecretKey,否则 Broker 拒绝访问!

# 10.5 ACL 认证失败 & 排查

如果 客户端没有正确配置 AccessKey / SecretKey,会报如下错误:

org.apache.rocketmq.remoting.exception.RemotingConnectException: no permission
1

✅ 解决方案:

  1. 确认 ACL 配置文件 plain_acl.yml 中的 accessKey、secretKey 是否正确。
  2. 客户端 AclClientRPCHook 是否正确设置凭证。
  3. 检查 broker.conf 是否开启 aclEnable=true。
  4. 检查 RocketMQ 日志 /logs/rocketmqlogs/broker.log 了解具体错误信息。

# 11. RocketMQ面试题

# 11.1 RocketMQ如何保证消息不丢失

RocketMQ的消息流程可以分为生产阶段、存储阶段和消费阶段,每个阶段都有可能出现消息丢失的风险。

  • 生产阶段:Producer 发送消息到 Broker 可能因网络故障等原因丢失。
  • 存储阶段:Broker 先将消息存入内存,再根据刷盘策略持久化到磁盘。如果在持久化之前宕机,消息可能丢失。
  • 消费阶段:如果消费者在消费消息前提交了ACK,但消费过程中出现异常,消息就会丢失。

解决方案:

  • 生产阶段:采用同步发送 + 失败重试机制;异步发送时重写回调方法检查发送结果;Ack 确认机制。
  • 存储阶段:使用同步刷盘机制;在集群模式下采用同步复制。
  • 消费阶段:确保正常消费后才提交 ACK;处理异常时返回重试标识。

此外,为了确保消息的顺序消费,需要保证消息的顺序投递和消费逻辑。

image-20250302164248037

image-20250302164345593

# 11.2 RocketMQ的消息持久化机制

RocketMQ 的持久化机制保证消息的可靠存储,主要依赖以下三大组件:

  • CommitLog:存储所有消息的文件,采用顺序写方式,每个文件大小固定为 1GB,写满后创建新文件。
  • ConsumeQueue:类似数据库索引,为每个主题的消息队列存储消息偏移量(offset)及其在 CommitLog 中的位置信息,加速消费。
  • IndexFile:存储消息的 key 和 offset 映射关系,便于快速检索。每个 IndexFile 文件固定 400MB,可存储约 2000 万条索引。

消息存储流程:

  1. Producer 发送消息,Broker 接收到消息后先存入内存缓冲区。
  2. 当达到一定数据量或时间阈值时,RocketMQ 执行批量刷盘(可选同步或异步刷盘)。
  3. 消息写入 CommitLog,同时生成 ConsumeQueue 和 IndexFile 索引数据。
  4. Consumer 通过 ConsumeQueue 或 IndexFile 查询 CommitLog,读取消息并消费。

# 11.3 RocketMQ如何保证消息顺序

RocketMQ 本身不保证全局消息有序,但通过 API 可实现局部有序消费:

  • MessageQueueSelector 选择消息队列
    Producer 通过特定规则(如订单 ID)将相关消息发送到同一队列,保证该队列内的消息顺序。
  • MessageListenerOrderly 顺序消费
    消费者使用 MessageListenerOrderly 接口,确保一个队列中的消息按顺序消费。
    实现方式:
    • 一个队列同时只能被一个消费者消费(加锁 + 定时任务续锁)。
    • 如果消费失败,RocketMQ 提供重试机制,确保消息不会被跳过。
  • 其他方式:
    • Pull模式下消费者手动保证顺序。
    • 并发消费时限制线程数,确保同一队列由单线程消费。

# 11.4 RocketMQ的事务消息原理

RocketMQ 通过两阶段提交 + 事务回查机制,保证事务消息的可靠性。

执行流程:

  1. 预处理阶段:Producer 发送事务消息,RocketMQ 设置状态为“Preparing”并存入消息存储库。

  2. 执行本地事务

    :Producer 执行本地事务逻辑,并返回事务状态:

    • Commit:确认提交,消息可被消费。
    • Rollback:回滚事务,消息不会被消费。
  3. 二次确认阶段:Broker 根据本地事务结果更新消息状态(Committing 或 Rollback)。

  4. 事务回查机制:如果网络异常或 Producer 宕机,RocketMQ 触发事务回查,调用 Producer 提供的回查接口确认事务状态,确保消息一致性。

总结 RocketMQ 通过多阶段机制确保消息的可靠性和顺序性,在分布式环境下仍能保证高效稳定的消息投递。

编辑 (opens new window)
上次更新: 2025/04/01, 01:48:12

← Pinia 集中式状态存储 请求参数注解→

最近更新
01
操作系统
03-18
02
Nginx
03-17
03
后端服务端主动推送消息的常见方式
03-11
更多文章>
Theme by Vdoing | Copyright © 2023-2025 xiaoyang | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式