gogodjzhu opened a new issue, #9281: URL: https://github.com/apache/rocketmq/issues/9281
### Before Creating the Bug Report - [x] I found a bug, not just asking a question, which should be created in [GitHub Discussions](https://github.com/apache/rocketmq/discussions). - [x] I have searched the [GitHub Issues](https://github.com/apache/rocketmq/issues) and [GitHub Discussions](https://github.com/apache/rocketmq/discussions) of this repository and believe that this is not a duplicate. - [x] I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ. ### Runtime platform environment - OS: Ubuntu ### RocketMQ version - RocketMQ Broker/Namesvr version: 5.3.1 - RocketMQ Client SDK version: 5.3.1 ### JDK Version openjdk version "1.8.0_432" ### Describe the Bug In broadcast consumption mode, when a new consumer starts with `ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET`, it cannot guarantee to receive all messages produced after the consumer starts. This is because the consumer's pull offset in local is empty, so it will determined by the broker's timestamp when it recognizes the `CONSUME_FROM_LAST_OFFSET` setting, rather than the actual consumer start time. ### Steps to Reproduce ```java package com.rocketmq; import java.nio.charset.StandardCharsets; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; public class RocketmqBroadCastTest { public static void main(String[] args) throws InterruptedException, MQClientException, MQBrokerException, RemotingException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("TopicTest001", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.setMessageModel(MessageModel.BROADCASTING); consumer.setConsumerGroup("cg_TopicTest001_004"); // NOTE: brand new consumer group, has no offset locally int totalCount = 10; AtomicInteger receivedCount = new AtomicInteger(0); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { receivedCount.addAndGet(msgs.size()); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); // Thread.sleep(3000); // NOTE: if wait a few seconds the consumer to start, then all the messages will be consumed sendMsg(totalCount); Thread.sleep(5000); // wait for the consumer to consume messages consumer.shutdown(); // NOTE: received count less than total count System.out.printf("Received %d messages, expected %d%n", receivedCount.get(), totalCount); } public static void sendMsg(int count) throws MQClientException, MQBrokerException, RemotingException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.setNamesrvAddr("localhost:9876"); producer.start(); for (int i = 0; i < count; i++) { Message msg = new Message("TopicTest001", "TagA", "OrderID188", ("msg-" + i).getBytes(StandardCharsets.UTF_8)); SendResult sendResult = producer.send(msg); if (sendResult == null || sendResult.getSendStatus() != SendStatus.SEND_OK) { throw new RuntimeException("send msg error"); } } producer.shutdown(); System.out.printf("send msg over"); } } ``` ### What Did You Expect to See? When a consumer starts in broadcast mode with CONSUME_FROM_LAST_OFFSET, it should receive all messages produced after its start time. ### What Did You See Instead? Some messages produced immediately after consumer start are lost. ### Additional Context **Proposed Solution** Enhance `LocalFileOffsetStore.load()` to: 1. Check if local offset exists 2. If no local offset, fetch initial offset from broker based on ConsumeFromWhere setting 3. Use this as the starting point for consumption -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org