muzhi9018 opened a new issue, #8126: URL: https://github.com/apache/rocketmq/issues/8126
### Before Creating the Bug Report - [X] I found a bug, not just asking a question, which should be created in [GitHub Discussions](https://github.com/apache/rocketmq/discussions). - [X] I have searched the [GitHub Issues](https://github.com/apache/rocketmq/issues) and [GitHub Discussions](https://github.com/apache/rocketmq/discussions) of this repository and believe that this is not a duplicate. - [X] I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ. ### Runtime platform environment 宿主机 Win 11 23H2 Docker 容器基础镜像 ubuntu:22.04 ### RocketMQ version Rocket MQ 版本 5.1.4 Rocket MQ 客户端版本 5.0.6 (rocketmq-client-java) 客户端 Maven 坐标 ```xml dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client-java</artifactId> <version>5.0.6</version> </dependency> ``` ### JDK Version Zulu JRE 17.0.11 ### Describe the Bug 使用 Docker Compose 部署 2 个 Master Broker; 2 个 Slave Broker; 当其中一个 Master Broker 宕机之后会有部分消息一直重复消费 ### Steps to Reproduce Dockerfile 如下 ```dockerfile FROM java:17.0.11 ARG version=5.1.4 # 定义用户名变量 ARG USER_NAME="rocketmq" # 定义用户id变量 ARG USER_ID="1000" # 定义用户组名称变量 ARG GROUP_NAME="rocketmq" # 定义用户组id变量 ARG GROUP_ID="1000" COPY benchmark /rocketmq/benchmark COPY bin /rocketmq/bin COPY conf /rocketmq/conf COPY lib /rocketmq/lib COPY LICENSE /rocketmq/LICENSE COPY NOTICE /rocketmq/NOTICE # 创建用户组和创建用户; 并且更改文件权限 RUN chown -R $USER_ID:$GROUP_ID /rocketmq \ && groupadd -g $GROUP_ID $GROUP_NAME \ && useradd -d /rocketmq -u $USER_ID -g $GROUP_NAME -l -m -s /bin/bash $USER_NAME \ && chown -R $USER_NAME:$GROUP_NAME /rocketmq \ && chmod +x -R /rocketmq/bin # rocketmq home 环境变量 ENV ROCKETMQ_HOME=/rocketmq ENV PATH=$ROCKETMQ_HOME/bin:$PATH # 切换用户 USER $USER_NAME # 设置工作目录 WORKDIR /rocketmq/bin ENV JAVA_OPT="" ENV JAVA_OPT_EXT="" # nameserver 端口 EXPOSE 9876 # broker 端口 EXPOSE 10909 10911 10912 CMD ["dummy"] ``` docker-compose.yaml 如下 ```yaml services: rocket-nameserver: image: apache/rocketmq:5.1.4 container_name: rocket-nameserver restart: no stop_grace_period: 5s networks: - muzhi-cloud-net # ports: # - "9876:9876" env_file: - "conf/nameserver.env" entrypoint: ["sh", "mqnamesrv"] broker-master-a: image: apache/rocketmq:5.1.4 container_name: broker-master-a restart: no stop_grace_period: 5s depends_on: - rocket-nameserver networks: - muzhi-cloud-net volumes: - "D:/WorkSpace/rocketmq/docker-home/broker-master-a/store:/rocketmq/store" - "D:/WorkSpace/rocketmq/docker-home/broker-master-a/logs:/rocketmq/logs" - "D:/WorkSpace/rocketmq/docker-home/conf/broker-master-a.conf:/rocketmq/conf/broker.conf" # ports: # - "10909:10909" # - "10911:10911" # - "10912:10912" env_file: - "conf/broker.env" entrypoint: ["sh", "mqbroker", "-c", "/rocketmq/conf/broker.conf"] broker-slave-a01: image: apache/rocketmq:5.1.4 container_name: broker-slave-a01 restart: no stop_grace_period: 5s depends_on: - rocket-nameserver networks: - muzhi-cloud-net volumes: - "D:/WorkSpace/rocketmq/docker-home/broker-slave-a01/store:/rocketmq/store" - "D:/WorkSpace/rocketmq/docker-home/broker-slave-a01/logs:/rocketmq/logs" - "D:/WorkSpace/rocketmq/docker-home/conf/broker-slave-a01.conf:/rocketmq/conf/broker.conf" # ports: # - "11909:11909" # - "11911:11911" # - "11912:11912" env_file: - "conf/broker.env" entrypoint: ["sh", "mqbroker", "-c", "/rocketmq/conf/broker.conf"] broker-master-b: image: apache/rocketmq:5.1.4 container_name: broker-master-b restart: no stop_grace_period: 5s depends_on: - rocket-nameserver networks: - muzhi-cloud-net volumes: - "D:/WorkSpace/rocketmq/docker-home/broker-master-b/store:/rocketmq/store" - "D:/WorkSpace/rocketmq/docker-home/broker-master-b/logs:/rocketmq/logs" - "D:/WorkSpace/rocketmq/docker-home/conf/broker-master-b.conf:/rocketmq/conf/broker.conf" # ports: # - "12909:12909" # - "12911:12911" # - "12912:12912" env_file: - "conf/broker.env" entrypoint: ["sh", "mqbroker", "-c", "/rocketmq/conf/broker.conf"] broker-slave-b01: image: apache/rocketmq:5.1.4 container_name: broker-slave-b01 restart: no stop_grace_period: 5s depends_on: - rocket-nameserver networks: - muzhi-cloud-net volumes: - "D:/WorkSpace/rocketmq/docker-home/broker-slave-b01/store:/rocketmq/store" - "D:/WorkSpace/rocketmq/docker-home/broker-slave-b01/logs:/rocketmq/logs" - "D:/WorkSpace/rocketmq/docker-home/conf/broker-slave-b01.conf:/rocketmq/conf/broker.conf" # ports: # - "13909:13909" # - "13911:13911" # - "13912:13912" env_file: - "conf/broker.env" entrypoint: ["sh", "mqbroker", "-c", "/rocketmq/conf/broker.conf"] rocket-proxy: image: apache/rocketmq:5.1.4 container_name: rocket-proxy restart: no stop_grace_period: 5s depends_on: - broker-master-a - broker-master-b networks: - muzhi-cloud-net volumes: - "D:/WorkSpace/rocketmq/docker-home/rocket-proxy/logs:/rocketmq/logs" - "D:/WorkSpace/rocketmq/docker-home/conf/rmq-proxy.json:/rocketmq/conf/rmq-proxy.json" ports: - "9080:9080" - "9081:9081" entrypoint: [ "sh", "mqproxy", "-pc", "/rocketmq/conf/rmq-proxy.json" ] rocketmq-dashboard: image: rocketmq/dashboard:1.0.0 container_name: rocketmq-dashboard restart: no stop_grace_period: 5s depends_on: - rocket-nameserver networks: - muzhi-cloud-net ports: - "8180:8180" networks: muzhi-cloud-net: external: true ``` Master Borker a 配置文件如下 broker-master-a.conf ```conf namesrvAddr = rocket-nameserver:9876 listenPort = 10911 brokerClusterName = DefaultCluster brokerName = muzhi-broker-a brokerId = 0 brokerIP1 = broker-master-a brokerIP2 = broker-master-a brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH autoCreateTopicEnable = false slaveReadEnable = false storePathCommitLog = /rocketmq/store/commitlog/ storePathConsumerQueue = /rocketmq/store/consumequeue/ deleteWhen = 04 fileReservedTime = 72 mapedFileSizeCommitLog = 1024 * 1024 * 1024a ``` Slave Borker a01 配置文件如下 broker-slave-a01.conf ```conf namesrvAddr = rocket-nameserver:9876 listenPort = 11911 brokerClusterName = DefaultCluster brokerName = muzhi-broker-a brokerId = 1 brokerIP1 = broker-slave-a01 brokerIP2 = broker-slave-a01 brokerRole = SLAVE flushDiskType = ASYNC_FLUSH autoCreateTopicEnable = false slaveReadEnable = false storePathCommitLog = /rocketmq/store/commitlog/ storePathConsumerQueue = /rocketmq/store/consumequeue/ deleteWhen = 04 fileReservedTime = 72 mapedFileSizeCommitLog = 1024 * 1024 * 1024 ``` Master Borker b 配置文件如下 broker-master-b.conf ```conf namesrvAddr = rocket-nameserver:9876 listenPort = 12911 brokerClusterName = DefaultCluster brokerName = muzhi-broker-b brokerId = 0 brokerIP1 = broker-master-b brokerIP2 = broker-master-b brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH autoCreateTopicEnable = false slaveReadEnable = false storePathCommitLog = /rocketmq/store/commitlog/ storePathConsumerQueue = /rocketmq/store/consumequeue/ deleteWhen = 04 fileReservedTime = 72 mapedFileSizeCommitLog = 1024 * 1024 * 1024 ``` Slave Borker b01 配置文件如下 broker-slave-b01.conf ```conf namesrvAddr = rocket-nameserver:9876 listenPort = 13911 brokerClusterName = DefaultCluster brokerName = muzhi-broker-b brokerId = 1 brokerIP1 = broker-slave-b01 brokerIP2 = broker-slave-b01 brokerRole = SLAVE flushDiskType = ASYNC_FLUSH autoCreateTopicEnable = false slaveReadEnable = false storePathCommitLog = /rocketmq/store/commitlog/ storePathConsumerQueue = /rocketmq/store/consumequeue/ deleteWhen = 04 fileReservedTime = 72 mapedFileSizeCommitLog = 1024 * 1024 * 1024 ``` 代理组件配置文件如下 rmq-proxy.json ``` { "namesrvAddr": "rocket-nameserver:9876", "rocketMQClusterName": "DefaultCluster", "remotingListenPort": 9080, "grpcServerPort": 9081 } ``` 正常启动 消息生产者,和消息消费者 消息生产者代码如下 ```java package com.muzhi.rocketmq; import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.ClientServiceProvider; import org.apache.rocketmq.client.apis.message.Message; import org.apache.rocketmq.client.apis.producer.Producer; import org.apache.rocketmq.client.apis.producer.SendReceipt; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Random; import java.util.Scanner; public class GrpcProducer { public static void main(String[] args) throws ClientException, IOException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); String endpoints = "192.168.2.150:9080"; String topic = "muzhi-cloud-test"; String tag = "test-message"; ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) .enableSsl(false) .build(); Producer producer = provider.newProducerBuilder() .setClientConfiguration(clientConfiguration) .setTopics(topic) .build(); Scanner scanner = new Scanner(System.in); Random random = new Random(); DateFormat format = new SimpleDateFormat("yyyyMMddHHmmss"); while (scanner.hasNext()) { String next; next = scanner.next(); if (next.equals("shutdown")) { producer.close(); scanner.close(); } Message message = provider.newMessageBuilder() .setTopic(topic) .setTag(tag) .setKeys(format.format(System.currentTimeMillis()) + (random.nextInt(900000) + 100000)) .setBody(next.getBytes(StandardCharsets.UTF_8)) .build(); final SendReceipt sendReceipt = producer.send(message); System.out.printf("消息发送成功 id: %s%n",sendReceipt.getMessageId()); } } } ``` 消息消费者代码如下 ```java package com.muzhi.rocketmq; import org.apache.rocketmq.client.apis.*; import org.apache.rocketmq.client.apis.consumer.ConsumeResult; import org.apache.rocketmq.client.apis.consumer.FilterExpression; import org.apache.rocketmq.client.apis.consumer.FilterExpressionType; import org.apache.rocketmq.client.apis.consumer.PushConsumer; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Collections; public class GrpcConsumer { public static void main(String[] args) throws ClientException, InterruptedException, IOException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); String endpoints = "192.168.2.150:9080"; String consumerGroup = "muzhi-cloud-group"; String topic = "muzhi-cloud-test"; String tag = "test-message"; ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) .enableSsl(false) .build(); FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); PushConsumer pushConsumer = provider.newPushConsumerBuilder() .setClientConfiguration(clientConfiguration) .setConsumerGroup(consumerGroup) .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) .setMessageListener(messageView -> { String body = StandardCharsets.UTF_8.decode(messageView.getBody()).toString(); System.out.printf("MQ 客户端收到消息 id: %s; body: %s; %n", messageView.getKeys(), body); return ConsumeResult.SUCCESS; }) .build(); Thread.sleep(Long.MAX_VALUE); pushConsumer.close(); } } ``` 连续发送多条消息 中途关闭 broker-master-a 已经发送的部分消息一直重复消费 消费者控制台可以清除的看到 消息被重复消费 <img width="1728" alt="image" src="https://github.com/apache/rocketmq/assets/38956027/783620ad-e69d-4561-9729-e2e241820d88"> dashboard <img width="1728" alt="image" src="https://github.com/apache/rocketmq/assets/38956027/783620ad-e69d-4561-9729-e2e241820d88"> 后续 broker-master-a 启动,则回复正常 ### What Did You Expect to See? 配置文件 slaveReadEnable 设置为 false 希望 Master 宕机之后能正常 消费者不要重复一直消费 ### What Did You See Instead? 部分消息一直重复消费 ### Additional Context _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org