muzhi9018 opened a new issue, #8126:
URL: https://github.com/apache/rocketmq/issues/8126

   ### Before Creating the Bug Report
   
   - [X] I found a bug, not just asking a question, which should be created in 
[GitHub Discussions](https://github.com/apache/rocketmq/discussions).
   
   - [X] I have searched the [GitHub 
Issues](https://github.com/apache/rocketmq/issues) and [GitHub 
Discussions](https://github.com/apache/rocketmq/discussions)  of this 
repository and believe that this is not a duplicate.
   
   - [X] I have confirmed that this bug belongs to the current repository, not 
other repositories of RocketMQ.
   
   
   ### Runtime platform environment
   
   宿主机 Win 11 23H2
   Docker 容器基础镜像  ubuntu:22.04
   
   ### RocketMQ version
   
   Rocket MQ 版本 5.1.4
   Rocket MQ 客户端版本 5.0.6 (rocketmq-client-java)
   客户端 Maven 坐标
   ```xml
   dependency>
       <groupId>org.apache.rocketmq</groupId>
       <artifactId>rocketmq-client-java</artifactId>
       <version>5.0.6</version>
   </dependency>
   ```
   
   ### JDK Version
   
   Zulu JRE 17.0.11
   
   ### Describe the Bug
   
   使用 Docker Compose 部署 2 个 Master Broker; 2 个 Slave Broker; 当其中一个 Master 
Broker 宕机之后会有部分消息一直重复消费 
   
   ### Steps to Reproduce
   
   Dockerfile 如下
   ```dockerfile
   FROM java:17.0.11

   
   ARG version=5.1.4

   
   # 定义用户名变量
   ARG USER_NAME="rocketmq"
   # 定义用户id变量
   ARG USER_ID="1000"
   # 定义用户组名称变量
   ARG GROUP_NAME="rocketmq"
   # 定义用户组id变量
   ARG GROUP_ID="1000"

   
   COPY benchmark /rocketmq/benchmark
   COPY bin /rocketmq/bin
   COPY conf /rocketmq/conf
   COPY lib /rocketmq/lib
   COPY LICENSE /rocketmq/LICENSE
   COPY NOTICE /rocketmq/NOTICE

   
   # 创建用户组和创建用户; 并且更改文件权限
   RUN chown -R $USER_ID:$GROUP_ID /rocketmq \
     && groupadd -g $GROUP_ID $GROUP_NAME \
     && useradd -d /rocketmq -u $USER_ID -g $GROUP_NAME -l -m -s /bin/bash 
$USER_NAME \
     && chown -R $USER_NAME:$GROUP_NAME /rocketmq \
     && chmod +x -R /rocketmq/bin
   
   # rocketmq home 环境变量
   ENV ROCKETMQ_HOME=/rocketmq
   ENV PATH=$ROCKETMQ_HOME/bin:$PATH

   
   # 切换用户
   USER $USER_NAME

   # 设置工作目录
   WORKDIR /rocketmq/bin

   ENV JAVA_OPT=""
   ENV JAVA_OPT_EXT=""

   
   # nameserver 端口
   EXPOSE 9876

   # broker 端口
   EXPOSE 10909 10911 10912

   
   CMD ["dummy"]
   
   ```
   
   docker-compose.yaml 如下
   ```yaml
   services:
     rocket-nameserver:
       image: apache/rocketmq:5.1.4
       container_name: rocket-nameserver
       restart: no
       stop_grace_period: 5s
       networks:
         - muzhi-cloud-net
   #    ports:
   #      - "9876:9876"
       env_file:
         - "conf/nameserver.env"
       entrypoint: ["sh", "mqnamesrv"]
     broker-master-a:
       image: apache/rocketmq:5.1.4
       container_name: broker-master-a
       restart: no
       stop_grace_period: 5s
       depends_on:
         - rocket-nameserver
       networks:
         - muzhi-cloud-net
       volumes:
         - 
"D:/WorkSpace/rocketmq/docker-home/broker-master-a/store:/rocketmq/store"
         - 
"D:/WorkSpace/rocketmq/docker-home/broker-master-a/logs:/rocketmq/logs"
         - 
"D:/WorkSpace/rocketmq/docker-home/conf/broker-master-a.conf:/rocketmq/conf/broker.conf"
   #    ports:
   #      - "10909:10909"
   #      - "10911:10911"
   #      - "10912:10912"
       env_file:
         - "conf/broker.env"
       entrypoint: ["sh", "mqbroker", "-c", "/rocketmq/conf/broker.conf"]
     broker-slave-a01:
       image: apache/rocketmq:5.1.4
       container_name: broker-slave-a01
       restart: no
       stop_grace_period: 5s
       depends_on:
         - rocket-nameserver
       networks:
         - muzhi-cloud-net
       volumes:
         - 
"D:/WorkSpace/rocketmq/docker-home/broker-slave-a01/store:/rocketmq/store"
         - 
"D:/WorkSpace/rocketmq/docker-home/broker-slave-a01/logs:/rocketmq/logs"
         - 
"D:/WorkSpace/rocketmq/docker-home/conf/broker-slave-a01.conf:/rocketmq/conf/broker.conf"
   #    ports:
   #      - "11909:11909"
   #      - "11911:11911"
   #      - "11912:11912"
       env_file:
         - "conf/broker.env"
       entrypoint: ["sh", "mqbroker", "-c", "/rocketmq/conf/broker.conf"]
     broker-master-b:
       image: apache/rocketmq:5.1.4
       container_name: broker-master-b
       restart: no
       stop_grace_period: 5s
       depends_on:
         - rocket-nameserver
       networks:
         - muzhi-cloud-net
       volumes:
         - 
"D:/WorkSpace/rocketmq/docker-home/broker-master-b/store:/rocketmq/store"
         - 
"D:/WorkSpace/rocketmq/docker-home/broker-master-b/logs:/rocketmq/logs"
         - 
"D:/WorkSpace/rocketmq/docker-home/conf/broker-master-b.conf:/rocketmq/conf/broker.conf"
   #    ports:
   #      - "12909:12909"
   #      - "12911:12911"
   #      - "12912:12912"
       env_file:
         - "conf/broker.env"
       entrypoint: ["sh", "mqbroker", "-c", "/rocketmq/conf/broker.conf"]
     broker-slave-b01:
       image: apache/rocketmq:5.1.4
       container_name: broker-slave-b01
       restart: no
       stop_grace_period: 5s
       depends_on:
         - rocket-nameserver
       networks:
         - muzhi-cloud-net
       volumes:
         - 
"D:/WorkSpace/rocketmq/docker-home/broker-slave-b01/store:/rocketmq/store"
         - 
"D:/WorkSpace/rocketmq/docker-home/broker-slave-b01/logs:/rocketmq/logs"
         - 
"D:/WorkSpace/rocketmq/docker-home/conf/broker-slave-b01.conf:/rocketmq/conf/broker.conf"
   #    ports:
   #      - "13909:13909"
   #      - "13911:13911"
   #      - "13912:13912"
       env_file:
         - "conf/broker.env"
       entrypoint: ["sh", "mqbroker", "-c", "/rocketmq/conf/broker.conf"]
     rocket-proxy:
       image: apache/rocketmq:5.1.4
       container_name: rocket-proxy
       restart: no
       stop_grace_period: 5s
       depends_on:
         - broker-master-a
         - broker-master-b
       networks:
         - muzhi-cloud-net
       volumes:
         - "D:/WorkSpace/rocketmq/docker-home/rocket-proxy/logs:/rocketmq/logs"
         - 
"D:/WorkSpace/rocketmq/docker-home/conf/rmq-proxy.json:/rocketmq/conf/rmq-proxy.json"
       ports:
         - "9080:9080"
         - "9081:9081"
       entrypoint: [ "sh", "mqproxy", "-pc", "/rocketmq/conf/rmq-proxy.json" ]
     rocketmq-dashboard:
       image: rocketmq/dashboard:1.0.0
       container_name: rocketmq-dashboard
       restart: no
       stop_grace_period: 5s
       depends_on:
         - rocket-nameserver
       networks:
         - muzhi-cloud-net
       ports:
         - "8180:8180"
   networks:
     muzhi-cloud-net:
       external: true
   
   ```
   Master Borker a 配置文件如下
   broker-master-a.conf
   ```conf
   namesrvAddr = rocket-nameserver:9876
   
   listenPort = 10911
   
   brokerClusterName = DefaultCluster
   
   brokerName = muzhi-broker-a
   
   brokerId = 0
   
   brokerIP1 = broker-master-a
   
   brokerIP2 = broker-master-a
   
   brokerRole = ASYNC_MASTER
   
   flushDiskType = ASYNC_FLUSH
   
   autoCreateTopicEnable = false
   
   slaveReadEnable = false
   
   storePathCommitLog = /rocketmq/store/commitlog/
   
   storePathConsumerQueue = /rocketmq/store/consumequeue/
   
   deleteWhen = 04
   
   fileReservedTime = 72
   
   mapedFileSizeCommitLog = 1024 * 1024 * 1024a
   
   ```
   Slave Borker a01 配置文件如下
   broker-slave-a01.conf
   ```conf
   namesrvAddr = rocket-nameserver:9876
   
   listenPort = 11911
   
   brokerClusterName = DefaultCluster
   
   brokerName = muzhi-broker-a
   
   brokerId = 1
   
   brokerIP1 = broker-slave-a01
   
   brokerIP2 = broker-slave-a01
   
   brokerRole = SLAVE
   
   flushDiskType = ASYNC_FLUSH
   
   autoCreateTopicEnable = false
   
   slaveReadEnable = false
   
   storePathCommitLog = /rocketmq/store/commitlog/
   
   storePathConsumerQueue = /rocketmq/store/consumequeue/
   
   deleteWhen = 04
   
   fileReservedTime = 72
   
   mapedFileSizeCommitLog = 1024 * 1024 * 1024
   ```
   Master Borker b 配置文件如下
   broker-master-b.conf
   ```conf
   namesrvAddr = rocket-nameserver:9876
   
   listenPort = 12911
   
   brokerClusterName = DefaultCluster
   
   brokerName = muzhi-broker-b
   
   brokerId = 0
   
   brokerIP1 = broker-master-b
   
   brokerIP2 = broker-master-b
   
   brokerRole = ASYNC_MASTER
   
   flushDiskType = ASYNC_FLUSH
   
   autoCreateTopicEnable = false
   
   slaveReadEnable = false
   
   storePathCommitLog = /rocketmq/store/commitlog/
   
   storePathConsumerQueue = /rocketmq/store/consumequeue/
   
   deleteWhen = 04
   
   fileReservedTime = 72
   
   mapedFileSizeCommitLog = 1024 * 1024 * 1024
   ```
   Slave Borker b01 配置文件如下
   broker-slave-b01.conf
   ```conf
   namesrvAddr = rocket-nameserver:9876
   
   listenPort = 13911
   
   brokerClusterName = DefaultCluster
   
   brokerName = muzhi-broker-b
   
   brokerId = 1
   
   brokerIP1 = broker-slave-b01
   
   brokerIP2 = broker-slave-b01
   
   brokerRole = SLAVE
   
   flushDiskType = ASYNC_FLUSH
   
   autoCreateTopicEnable = false
   
   slaveReadEnable = false
   
   storePathCommitLog = /rocketmq/store/commitlog/
   
   storePathConsumerQueue = /rocketmq/store/consumequeue/
   
   deleteWhen = 04
   
   fileReservedTime = 72
   
   mapedFileSizeCommitLog = 1024 * 1024 * 1024
   ```
   代理组件配置文件如下
   rmq-proxy.json
   ```
   {
 
       "namesrvAddr": "rocket-nameserver:9876",
       "rocketMQClusterName": "DefaultCluster",
       "remotingListenPort": 9080,
       "grpcServerPort": 9081
   }
   ```
   
   正常启动 消息生产者,和消息消费者
   消息生产者代码如下
   ```java
   package com.muzhi.rocketmq;
   
   import org.apache.rocketmq.client.apis.ClientConfiguration;
   import org.apache.rocketmq.client.apis.ClientException;
   import org.apache.rocketmq.client.apis.ClientServiceProvider;
   import org.apache.rocketmq.client.apis.message.Message;
   import org.apache.rocketmq.client.apis.producer.Producer;
   import org.apache.rocketmq.client.apis.producer.SendReceipt;
   
   import java.io.IOException;
   import java.nio.charset.StandardCharsets;
   import java.text.DateFormat;
   import java.text.SimpleDateFormat;
   import java.util.Random;
   import java.util.Scanner;
   
   public class GrpcProducer {
   
       public static void main(String[] args) throws ClientException, 
IOException {
           final ClientServiceProvider provider = 
ClientServiceProvider.loadService();
   
           String endpoints = "192.168.2.150:9080";
           String topic = "muzhi-cloud-test";
           String tag = "test-message";
   
           ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder()
                   .setEndpoints(endpoints)
                   .enableSsl(false)
                   .build();
   
           Producer producer = provider.newProducerBuilder()
                   .setClientConfiguration(clientConfiguration)
                   .setTopics(topic)
                   .build();
   
   
           Scanner scanner = new Scanner(System.in);
           Random random = new Random();
           DateFormat format = new SimpleDateFormat("yyyyMMddHHmmss");
   
           while (scanner.hasNext()) {
               String next;
               next = scanner.next();
               if (next.equals("shutdown")) {
                   producer.close();
                   scanner.close();
               }
   
               Message message = provider.newMessageBuilder()
                       .setTopic(topic)
                       .setTag(tag)
                       .setKeys(format.format(System.currentTimeMillis()) + 
(random.nextInt(900000) + 100000))
                       .setBody(next.getBytes(StandardCharsets.UTF_8))
                       .build();
   
               final SendReceipt sendReceipt = producer.send(message);
               System.out.printf("消息发送成功 id: %s%n",sendReceipt.getMessageId());
           }
       }
   }
   
   ```
   消息消费者代码如下
   ```java
   package com.muzhi.rocketmq;
   
   import org.apache.rocketmq.client.apis.*;
   import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
   import org.apache.rocketmq.client.apis.consumer.FilterExpression;
   import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
   import org.apache.rocketmq.client.apis.consumer.PushConsumer;
   
   import java.io.IOException;
   import java.nio.charset.StandardCharsets;
   import java.util.Collections;
   
   public class GrpcConsumer {
   
       public static void main(String[] args) throws ClientException, 
InterruptedException, IOException {
           final ClientServiceProvider provider = 
ClientServiceProvider.loadService();
   
           String endpoints = "192.168.2.150:9080";
           String consumerGroup = "muzhi-cloud-group";
           String topic = "muzhi-cloud-test";
           String tag = "test-message";
   
           ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder()
                   .setEndpoints(endpoints)
                   .enableSsl(false)
                   .build();
   
           FilterExpression filterExpression = new FilterExpression(tag, 
FilterExpressionType.TAG);
           PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                   .setClientConfiguration(clientConfiguration)
                   .setConsumerGroup(consumerGroup)
                   .setSubscriptionExpressions(Collections.singletonMap(topic, 
filterExpression))
                   .setMessageListener(messageView -> {
                       String body = 
StandardCharsets.UTF_8.decode(messageView.getBody()).toString();
                       System.out.printf("MQ 客户端收到消息 id: %s; body: %s; %n", 
messageView.getKeys(), body);
                       return ConsumeResult.SUCCESS;
                   })
                   .build();
   
           Thread.sleep(Long.MAX_VALUE);
           pushConsumer.close();
       }
   }
   
   ```
   连续发送多条消息 中途关闭 broker-master-a
   已经发送的部分消息一直重复消费
   消费者控制台可以清除的看到 消息被重复消费
   <img width="1728" alt="image" 
src="https://github.com/apache/rocketmq/assets/38956027/783620ad-e69d-4561-9729-e2e241820d88";>
   
   dashboard 
   <img width="1728" alt="image" 
src="https://github.com/apache/rocketmq/assets/38956027/783620ad-e69d-4561-9729-e2e241820d88";>
   
   后续 broker-master-a 启动,则回复正常
   
   ### What Did You Expect to See?
   
   配置文件 slaveReadEnable 设置为 false
   希望 Master 宕机之后能正常 消费者不要重复一直消费
   
   ### What Did You See Instead?
   
   部分消息一直重复消费
   
   ### Additional Context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to