This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 3b12a25f0a [ISSUE #10076] Make orderly resetOffset wait on consume 
lock while preserving timeout semantics (#10175)
3b12a25f0a is described below

commit 3b12a25f0ab096d29b8f931825846e9aa0536ff7
Author: Lystran <[email protected]>
AuthorDate: Tue Mar 31 10:37:13 2026 +0800

    [ISSUE #10076] Make orderly resetOffset wait on consume lock while 
preserving timeout semantics (#10175)
---
 .../client/impl/factory/MQClientInstance.java      |  29 ++++-
 .../client/impl/factory/MQClientInstanceTest.java  | 135 ++++++++++++++++++++-
 2 files changed, 156 insertions(+), 8 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index e0b28fef64..e10807cf50 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -96,6 +96,7 @@ import static 
org.apache.rocketmq.remoting.rpc.ClientMetadata.topicRouteData2End
 
 public class MQClientInstance {
     private final static long LOCK_TIMEOUT_MILLIS = 3000;
+    private final static long RESET_OFFSET_MAX_WAIT = 10;
     private final static Logger log = 
LoggerFactory.getLogger(MQClientInstance.class);
     private final ClientConfig clientConfig;
     private final String clientId;
@@ -1380,9 +1381,11 @@ public class MQClientInstance {
                 }
             }
 
-            try {
-                TimeUnit.SECONDS.sleep(10);
-            } catch (InterruptedException ignored) {
+            if (!consumer.isConsumeOrderly()) {
+                try {
+                    TimeUnit.SECONDS.sleep(RESET_OFFSET_MAX_WAIT);
+                } catch (InterruptedException ignored) {
+                }
             }
 
             Iterator<MessageQueue> iterator = 
processQueueTable.keySet().iterator();
@@ -1391,8 +1394,10 @@ public class MQClientInstance {
                 Long offset = offsetTable.get(mq);
                 if (topic.equals(mq.getTopic()) && offset != null) {
                     try {
+                        ProcessQueue pq = processQueueTable.get(mq);
+                        waitResetOffsetReady(consumer, pq);
                         consumer.updateConsumeOffset(mq, offset);
-                        
consumer.getRebalanceImpl().removeUnnecessaryMessageQueue(mq, 
processQueueTable.get(mq));
+                        
consumer.getRebalanceImpl().removeUnnecessaryMessageQueue(mq, pq);
                         iterator.remove();
                     } catch (Exception e) {
                         log.warn("reset offset failed. group={}, {}", group, 
mq, e);
@@ -1406,6 +1411,22 @@ public class MQClientInstance {
         }
     }
 
+    private void waitResetOffsetReady(DefaultMQPushConsumerImpl consumer, 
ProcessQueue pq) {
+        if (consumer.isConsumeOrderly()) {
+            Lock lock = pq.getConsumeLock().writeLock();
+            boolean locked = false;
+            try {
+                locked = lock.tryLock(RESET_OFFSET_MAX_WAIT, TimeUnit.SECONDS);
+            } catch (InterruptedException ignored) {
+                Thread.currentThread().interrupt();
+            } finally {
+                if (locked) {
+                    lock.unlock();
+                }
+            }
+        }
+    }
+
     @SuppressWarnings("unchecked")
     public Map<MessageQueue, Long> getConsumerStatus(String topic, String 
group) {
         MQConsumerInner impl = this.consumerTable.get(group);
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
index 376ff9da8e..82b9080438 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
@@ -74,7 +74,12 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
@@ -397,6 +402,119 @@ public class MQClientInstanceTest {
                         eq(0L));
     }
 
+    @Test
+    public void testResetOffsetOrderly() {
+        topicRouteTable.put(topic, createTopicRouteData());
+        brokerAddrTable.put(defaultBroker, createBrokerAddrMap());
+        MessageQueue messageQueue = createMessageQueue();
+        ProcessQueue processQueue = new ProcessQueue();
+        RebalanceImpl rebalanceImpl = mock(RebalanceImpl.class);
+        when(rebalanceImpl.removeUnnecessaryMessageQueue(eq(messageQueue), 
eq(processQueue)))
+            .thenReturn(false, false, true);
+        consumerTable.put(group, createMQConsumerInner(processQueue, true, 
rebalanceImpl));
+        Map<MessageQueue, Long> offsetTable = new HashMap<>();
+        offsetTable.put(messageQueue, 0L);
+
+        mqClientInstance.resetOffset(topic, group, offsetTable);
+
+        verify(rebalanceImpl).removeUnnecessaryMessageQueue(messageQueue, 
processQueue);
+    }
+
+    @Test
+    public void testResetOffsetOrderlyWhenWaitTimesOut() throws 
InterruptedException {
+        topicRouteTable.put(topic, createTopicRouteData());
+        brokerAddrTable.put(defaultBroker, createBrokerAddrMap());
+        MessageQueue messageQueue = createMessageQueue();
+        ProcessQueue processQueue = mock(ProcessQueue.class);
+        ReadWriteLock consumeLock = mock(ReadWriteLock.class);
+        Lock writeLock = mock(Lock.class);
+        RebalanceImpl rebalanceImpl = mock(RebalanceImpl.class);
+        when(processQueue.getConsumeLock()).thenReturn(consumeLock);
+        when(consumeLock.writeLock()).thenReturn(writeLock);
+        when(writeLock.tryLock(10, TimeUnit.SECONDS)).thenReturn(false);
+        DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl) 
createMQConsumerInner(processQueue, true, rebalanceImpl);
+        consumerTable.put(group, consumer);
+        Map<MessageQueue, Long> offsetTable = new HashMap<>();
+        offsetTable.put(messageQueue, 0L);
+
+        mqClientInstance.resetOffset(topic, group, offsetTable);
+
+        verify(consumer).updateConsumeOffset(messageQueue, 0L);
+        verify(rebalanceImpl).removeUnnecessaryMessageQueue(messageQueue, 
processQueue);
+        verify(writeLock, times(1)).tryLock(10, TimeUnit.SECONDS);
+        verify(writeLock, times(0)).unlock();
+    }
+
+    @Test
+    public void 
testResetOffsetOrderlyWaitsForInflightConsumptionBeforeUpdatingOffset() throws 
Exception {
+        topicRouteTable.put(topic, createTopicRouteData());
+        brokerAddrTable.put(defaultBroker, createBrokerAddrMap());
+        MessageQueue messageQueue = createMessageQueue();
+        ProcessQueue processQueue = new ProcessQueue();
+        RebalanceImpl rebalanceImpl = mock(RebalanceImpl.class);
+        when(rebalanceImpl.removeUnnecessaryMessageQueue(eq(messageQueue), 
eq(processQueue))).thenReturn(true);
+        DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl) 
createMQConsumerInner(processQueue, true, rebalanceImpl);
+        consumerTable.put(group, consumer);
+        Map<MessageQueue, Long> offsetTable = new HashMap<>();
+        offsetTable.put(messageQueue, 0L);
+
+        CountDownLatch consumeLockHeld = new CountDownLatch(1);
+        CountDownLatch releaseConsumeLock = new CountDownLatch(1);
+        CountDownLatch suspendCalled = new CountDownLatch(1);
+        CountDownLatch updateOffsetCalled = new CountDownLatch(1);
+        AtomicReference<Throwable> backgroundFailure = new AtomicReference<>();
+
+        doAnswer(invocation -> {
+            suspendCalled.countDown();
+            return null;
+        }).when(consumer).suspend();
+        doAnswer(invocation -> {
+            updateOffsetCalled.countDown();
+            return null;
+        }).when(consumer).updateConsumeOffset(messageQueue, 0L);
+
+        Thread consumingThread = new Thread(() -> {
+            processQueue.getConsumeLock().readLock().lock();
+            try {
+                consumeLockHeld.countDown();
+                if (!releaseConsumeLock.await(5, TimeUnit.SECONDS)) {
+                    backgroundFailure.compareAndSet(null,
+                        new AssertionError("Timed out while waiting to release 
orderly consume lock"));
+                }
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                backgroundFailure.compareAndSet(null, e);
+            } finally {
+                processQueue.getConsumeLock().readLock().unlock();
+            }
+        });
+        Thread resetThread = new Thread(() -> {
+            try {
+                mqClientInstance.resetOffset(topic, group, offsetTable);
+            } catch (Throwable t) {
+                backgroundFailure.compareAndSet(null, t);
+            }
+        });
+
+        consumingThread.start();
+        assertTrue(consumeLockHeld.await(5, TimeUnit.SECONDS));
+
+        resetThread.start();
+        assertTrue(suspendCalled.await(5, TimeUnit.SECONDS));
+        assertFalse(updateOffsetCalled.await(200, TimeUnit.MILLISECONDS));
+
+        releaseConsumeLock.countDown();
+        consumingThread.join(5000);
+        resetThread.join(5000);
+
+        assertNull(backgroundFailure.get());
+        assertFalse(consumingThread.isAlive());
+        assertFalse(resetThread.isAlive());
+        assertTrue(updateOffsetCalled.await(1, TimeUnit.SECONDS));
+        verify(consumer).updateConsumeOffset(messageQueue, 0L);
+        verify(rebalanceImpl).removeUnnecessaryMessageQueue(messageQueue, 
processQueue);
+    }
+
     @Test
     public void testGetConsumerStatus() {
         topicRouteTable.put(topic, createTopicRouteData());
@@ -475,17 +593,26 @@ public class MQClientInstanceTest {
     }
 
     private MQConsumerInner createMQConsumerInner() {
+        RebalanceImpl rebalanceImpl = mock(RebalanceImpl.class);
+        
when(rebalanceImpl.removeUnnecessaryMessageQueue(any(MessageQueue.class), 
any(ProcessQueue.class))).thenReturn(true);
+        return createMQConsumerInner(new ProcessQueue(), false, rebalanceImpl);
+    }
+
+    private MQConsumerInner createMQConsumerInner(ProcessQueue processQueue, 
boolean orderly, RebalanceImpl rebalanceImpl) {
+        ConcurrentMap<MessageQueue, ProcessQueue> processQueueMap = new 
ConcurrentHashMap<>();
+        processQueueMap.put(createMessageQueue(), processQueue);
+        return createMQConsumerInner(processQueueMap, orderly, rebalanceImpl);
+    }
+
+    private MQConsumerInner createMQConsumerInner(ConcurrentMap<MessageQueue, 
ProcessQueue> processQueueMap, boolean orderly, RebalanceImpl rebalanceImpl) {
         DefaultMQPushConsumerImpl result = 
mock(DefaultMQPushConsumerImpl.class);
         Set<SubscriptionData> subscriptionDataSet = new HashSet<>();
         SubscriptionData subscriptionData = mock(SubscriptionData.class);
         subscriptionDataSet.add(subscriptionData);
         when(result.subscriptions()).thenReturn(subscriptionDataSet);
-        RebalanceImpl rebalanceImpl = mock(RebalanceImpl.class);
-        ConcurrentMap<MessageQueue, ProcessQueue> processQueueMap = new 
ConcurrentHashMap<>();
-        ProcessQueue processQueue = new ProcessQueue();
-        processQueueMap.put(createMessageQueue(), processQueue);
         when(rebalanceImpl.getProcessQueueTable()).thenReturn(processQueueMap);
         when(result.getRebalanceImpl()).thenReturn(rebalanceImpl);
+        when(result.isConsumeOrderly()).thenReturn(orderly);
         OffsetStore offsetStore = mock(OffsetStore.class);
         when(result.getOffsetStore()).thenReturn(offsetStore);
         ConsumeMessageService consumeMessageService = 
mock(ConsumeMessageService.class);

Reply via email to