gogodjzhu opened a new issue, #9281:
URL: https://github.com/apache/rocketmq/issues/9281

   ### Before Creating the Bug Report
   
   - [x] I found a bug, not just asking a question, which should be created in 
[GitHub Discussions](https://github.com/apache/rocketmq/discussions).
   
   - [x] I have searched the [GitHub 
Issues](https://github.com/apache/rocketmq/issues) and [GitHub 
Discussions](https://github.com/apache/rocketmq/discussions)  of this 
repository and believe that this is not a duplicate.
   
   - [x] I have confirmed that this bug belongs to the current repository, not 
other repositories of RocketMQ.
   
   
   ### Runtime platform environment
   
   - OS: Ubuntu
   
   ### RocketMQ version
   
   - RocketMQ Broker/Namesvr version: 5.3.1
   - RocketMQ Client SDK version: 5.3.1
   
   ### JDK Version
   
   openjdk version "1.8.0_432"
   
   ### Describe the Bug
   
   In broadcast consumption mode, when a new consumer starts with 
`ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET`, it cannot guarantee to receive all 
messages produced after the consumer starts. This is because the consumer's 
pull offset in local is empty, so it will determined by the broker's timestamp 
when it recognizes the `CONSUME_FROM_LAST_OFFSET` setting, rather than the 
actual consumer start time.
   
   ### Steps to Reproduce
   
   
   ```java
   package com.rocketmq;
   
   import java.nio.charset.StandardCharsets;
   import java.util.List;
   import java.util.concurrent.atomic.AtomicInteger;
   
   import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
   import 
org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
   import 
org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
   import 
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
   import org.apache.rocketmq.client.exception.MQBrokerException;
   import org.apache.rocketmq.client.exception.MQClientException;
   import org.apache.rocketmq.client.producer.DefaultMQProducer;
   import org.apache.rocketmq.client.producer.SendResult;
   import org.apache.rocketmq.client.producer.SendStatus;
   import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
   import org.apache.rocketmq.common.message.Message;
   import org.apache.rocketmq.common.message.MessageExt;
   import org.apache.rocketmq.remoting.exception.RemotingException;
   import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
   
   public class RocketmqBroadCastTest {
       public static void main(String[] args)
               throws InterruptedException, MQClientException, 
MQBrokerException, RemotingException {
           DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
           consumer.setNamesrvAddr("localhost:9876");
           consumer.subscribe("TopicTest001", "*");
           
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
           consumer.setMessageModel(MessageModel.BROADCASTING);
           consumer.setConsumerGroup("cg_TopicTest001_004"); // NOTE: brand new 
consumer group, has no offset locally
   
           int totalCount = 10;
   
           AtomicInteger receivedCount = new AtomicInteger(0);
           consumer.registerMessageListener(new MessageListenerConcurrently() {
   
               @Override
               public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> 
msgs, ConsumeConcurrentlyContext context) {
                   receivedCount.addAndGet(msgs.size());
                   return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
               }
           });
           consumer.start();
           System.out.printf("Consumer Started.%n");
   
           // Thread.sleep(3000); // NOTE: if wait a few seconds the consumer 
to start, then all the messages will be consumed
   
           sendMsg(totalCount);
           Thread.sleep(5000); // wait for the consumer to consume messages
           consumer.shutdown();
   
           // NOTE: received count less than total count
           System.out.printf("Received %d messages, expected %d%n", 
receivedCount.get(), totalCount);
       }
   
       public static void sendMsg(int count)
               throws MQClientException, MQBrokerException, RemotingException, 
InterruptedException {
           DefaultMQProducer producer = new 
DefaultMQProducer("ProducerGroupName");
           producer.setNamesrvAddr("localhost:9876");
           producer.start();
           for (int i = 0; i < count; i++) {
               Message msg = new Message("TopicTest001",
                       "TagA", "OrderID188",
                       ("msg-" + i).getBytes(StandardCharsets.UTF_8));
               SendResult sendResult = producer.send(msg);
               if (sendResult == null || sendResult.getSendStatus() != 
SendStatus.SEND_OK) {
                   throw new RuntimeException("send msg error");
               }
           }
           producer.shutdown();
           System.out.printf("send msg over");
       }
   }
   ```
   
   ### What Did You Expect to See?
   
   When a consumer starts in broadcast mode with CONSUME_FROM_LAST_OFFSET, it 
should receive all messages produced after its start time.
   
   ### What Did You See Instead?
   
   Some messages produced immediately after consumer start are lost.
   
   ### Additional Context
   
   **Proposed Solution**
   
   Enhance `LocalFileOffsetStore.load()` to:
   
   1. Check if local offset exists
   2. If no local offset, fetch initial offset from broker based on 
ConsumeFromWhere setting
   3. Use this as the starting point for consumption


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to