This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new a376fbcdb8 [ISSUE #7634] Introduce controllableOffset to prevent 
unnecessary suspension when OFFSET_ILLEGAL (#7635)
a376fbcdb8 is described below

commit a376fbcdb82e818cfa239da677669f1118e4c40f
Author: Zhouxiang Zhan <zhouxz...@apache.org>
AuthorDate: Tue Dec 12 16:52:12 2023 +0800

    [ISSUE #7634] Introduce controllableOffset to prevent unnecessary 
suspension when OFFSET_ILLEGAL (#7635)
    
    * Add controllableOffset to prevent unnecessary suspension when 
OFFSET_ILLEGAL
---
 .../client/consumer/store/ControllableOffset.java  | 115 +++++++++++++++++++++
 .../consumer/store/LocalFileOffsetStore.java       |  33 +++---
 .../client/consumer/store/OffsetStore.java         |   8 ++
 .../consumer/store/RemoteBrokerOffsetStore.java    |  45 ++++----
 .../impl/consumer/DefaultMQPushConsumerImpl.java   |  13 ++-
 .../client/impl/consumer/PullMessageService.java   |   8 ++
 .../store/RemoteBrokerOffsetStoreTest.java         |  32 ++++++
 7 files changed, 218 insertions(+), 36 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/store/ControllableOffset.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/store/ControllableOffset.java
new file mode 100644
index 0000000000..9db4bd2e2a
--- /dev/null
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/store/ControllableOffset.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.consumer.store;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * The ControllableOffset class encapsulates a thread-safe offset value that 
can be
+ * updated atomically. Additionally, this class allows for the offset to be 
"frozen,"
+ * which prevents further updates after the freeze operation has been 
performed.
+ * <p>
+ * Concurrency Scenarios:
+ * If {@code updateAndFreeze} is called before any {@code update} operations, 
it sets
+ * {@code allowToUpdate} to false and updates the offset to the target value 
specified.
+ * After this operation, further invocations of {@code update} will not affect 
the offset,
+ * as it is considered frozen.
+ * <p>
+ * If {@code update} is in progress while {@code updateAndFreeze} is invoked 
concurrently,
+ * the final outcome depends on the sequence of operations:
+ * 1. If {@code update}'s atomic update operation completes before {@code 
updateAndFreeze},
+ * the latter will overwrite the offset and set {@code allowToUpdate} to false,
+ * preventing any further updates.
+ * 2. If {@code updateAndFreeze} executes before the {@code update} finalizes 
its operation,
+ * the ongoing {@code update} will not proceed with its changes. The {@link 
AtomicLong#getAndUpdate}
+ * method used in both operations ensures atomicity and respects the final 
state imposed by
+ * {@code updateAndFreeze}, even if the {@code update} function has already 
begun.
+ * <p>
+ * In essence, once the {@code updateAndFreeze} operation is executed, the 
offset value remains
+ * immutable to any subsequent {@code update} calls due to the immediate 
visibility of the
+ * {@code allowToUpdate} state change, courtesy of its volatile nature.
+ * <p>
+ * The combination of an AtomicLong for the offset value and a volatile 
boolean flag for update
+ * control provides a reliable mechanism for managing offset values in 
concurrent environments.
+ */
+public class ControllableOffset {
+    // Holds the current offset value in an atomic way.
+    private final AtomicLong value;
+    // Controls whether updates to the offset are allowed.
+    private volatile boolean allowToUpdate;
+
+    public ControllableOffset(long value) {
+        this.value = new AtomicLong(value);
+        this.allowToUpdate = true;
+    }
+
+    /**
+     * Attempts to update the offset to the target value. If increaseOnly is 
true,
+     * the offset will not be decreased. The update operation is atomic and 
thread-safe.
+     * The operation will respect the current allowToUpdate state, and if the 
offset
+     * has been frozen by a previous call to {@link #updateAndFreeze(long)},
+     * this method will not update the offset.
+     *
+     * @param target       the new target offset value.
+     * @param increaseOnly if true, the offset will only be updated if the 
target value
+     *                     is greater than the current value.
+     */
+    public void update(long target, boolean increaseOnly) {
+        if (allowToUpdate) {
+            value.getAndUpdate(val -> {
+                if (allowToUpdate) {
+                    if (increaseOnly) {
+                        return Math.max(target, val);
+                    } else {
+                        return target;
+                    }
+                } else {
+                    return val;
+                }
+            });
+        }
+    }
+
+    /**
+     * Overloaded method for updating the offset value unconditionally.
+     *
+     * @param target The new target value for the offset.
+     */
+    public void update(long target) {
+        update(target, false);
+    }
+
+    /**
+     * Freezes the offset at the target value provided. Once frozen, the offset
+     * cannot be updated by subsequent calls to {@link #update(long, boolean)}.
+     * This method will set allowToUpdate to false and then update the offset,
+     * ensuring the new value is the final state of the offset.
+     *
+     * @param target the new target offset value to freeze at.
+     */
+    public void updateAndFreeze(long target) {
+        value.getAndUpdate(val -> {
+            allowToUpdate = false;
+            return target;
+        });
+    }
+
+    public long getOffset() {
+        return value.get();
+    }
+}
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
index 832888dbeb..074508c46b 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
@@ -47,7 +47,7 @@ public class LocalFileOffsetStore implements OffsetStore {
     private final MQClientInstance mQClientFactory;
     private final String groupName;
     private final String storePath;
-    private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
+    private ConcurrentMap<MessageQueue, ControllableOffset> offsetTable =
         new ConcurrentHashMap<>();
 
     public LocalFileOffsetStore(MQClientInstance mQClientFactory, String 
groupName) {
@@ -63,10 +63,9 @@ public class LocalFileOffsetStore implements OffsetStore {
     public void load() throws MQClientException {
         OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();
         if (offsetSerializeWrapper != null && 
offsetSerializeWrapper.getOffsetTable() != null) {
-            offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());
-
             for (Entry<MessageQueue, AtomicLong> mqEntry : 
offsetSerializeWrapper.getOffsetTable().entrySet()) {
                 AtomicLong offset = mqEntry.getValue();
+                offsetTable.put(mqEntry.getKey(), new 
ControllableOffset(offset.get()));
                 log.info("load consumer's offset, {} {} {}",
                         this.groupName,
                         mqEntry.getKey(),
@@ -78,30 +77,38 @@ public class LocalFileOffsetStore implements OffsetStore {
     @Override
     public void updateOffset(MessageQueue mq, long offset, boolean 
increaseOnly) {
         if (mq != null) {
-            AtomicLong offsetOld = this.offsetTable.get(mq);
+            ControllableOffset offsetOld = this.offsetTable.get(mq);
             if (null == offsetOld) {
-                offsetOld = this.offsetTable.putIfAbsent(mq, new 
AtomicLong(offset));
+                offsetOld = this.offsetTable.putIfAbsent(mq, new 
ControllableOffset(offset));
             }
 
             if (null != offsetOld) {
                 if (increaseOnly) {
-                    MixAll.compareAndIncreaseOnly(offsetOld, offset);
+                    offsetOld.update(offset, true);
                 } else {
-                    offsetOld.set(offset);
+                    offsetOld.update(offset);
                 }
             }
         }
     }
 
+    @Override
+    public void updateAndFreezeOffset(MessageQueue mq, long offset) {
+        if (mq != null) {
+            this.offsetTable.computeIfAbsent(mq, k -> new 
ControllableOffset(offset))
+                .updateAndFreeze(offset);
+        }
+    }
+
     @Override
     public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
         if (mq != null) {
             switch (type) {
                 case MEMORY_FIRST_THEN_STORE:
                 case READ_FROM_MEMORY: {
-                    AtomicLong offset = this.offsetTable.get(mq);
+                    ControllableOffset offset = this.offsetTable.get(mq);
                     if (offset != null) {
-                        return offset.get();
+                        return offset.getOffset();
                     } else if (ReadOffsetType.READ_FROM_MEMORY == type) {
                         return -1;
                     }
@@ -135,9 +142,9 @@ public class LocalFileOffsetStore implements OffsetStore {
             return;
 
         OffsetSerializeWrapper offsetSerializeWrapper = new 
OffsetSerializeWrapper();
-        for (Map.Entry<MessageQueue, AtomicLong> entry : 
this.offsetTable.entrySet()) {
+        for (Map.Entry<MessageQueue, ControllableOffset> entry : 
this.offsetTable.entrySet()) {
             if (mqs.contains(entry.getKey())) {
-                AtomicLong offset = entry.getValue();
+                AtomicLong offset = new 
AtomicLong(entry.getValue().getOffset());
                 offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), 
offset);
             }
         }
@@ -170,12 +177,12 @@ public class LocalFileOffsetStore implements OffsetStore {
     @Override
     public Map<MessageQueue, Long> cloneOffsetTable(String topic) {
         Map<MessageQueue, Long> cloneOffsetTable = new 
HashMap<>(this.offsetTable.size(), 1);
-        for (Map.Entry<MessageQueue, AtomicLong> entry : 
this.offsetTable.entrySet()) {
+        for (Map.Entry<MessageQueue, ControllableOffset> entry : 
this.offsetTable.entrySet()) {
             MessageQueue mq = entry.getKey();
             if (!UtilAll.isBlank(topic) && !topic.equals(mq.getTopic())) {
                 continue;
             }
-            cloneOffsetTable.put(mq, entry.getValue().get());
+            cloneOffsetTable.put(mq, entry.getValue().getOffset());
 
         }
         return cloneOffsetTable;
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java
index 9deed0e3df..ecceedee17 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java
@@ -37,6 +37,14 @@ public interface OffsetStore {
      */
     void updateOffset(final MessageQueue mq, final long offset, final boolean 
increaseOnly);
 
+    /**
+     * Update and freeze the message queue to prevent concurrent update action
+     *
+     * @param mq target message queue
+     * @param offset expect update offset
+     */
+    void updateAndFreezeOffset(final MessageQueue mq, final long offset);
+
     /**
      * Get offset from local storage
      *
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
index 900e822114..83d5061adb 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
@@ -22,7 +22,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.exception.OffsetNotFoundException;
@@ -31,11 +30,11 @@ import 
org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import 
org.apache.rocketmq.remoting.protocol.header.QueryConsumerOffsetRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetRequestHeader;
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 
 /**
  * Remote storage implementation
@@ -44,7 +43,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
     private final static Logger log = 
LoggerFactory.getLogger(RemoteBrokerOffsetStore.class);
     private final MQClientInstance mQClientFactory;
     private final String groupName;
-    private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
+    private ConcurrentMap<MessageQueue, ControllableOffset> offsetTable =
         new ConcurrentHashMap<>();
 
     public RemoteBrokerOffsetStore(MQClientInstance mQClientFactory, String 
groupName) {
@@ -59,30 +58,38 @@ public class RemoteBrokerOffsetStore implements OffsetStore 
{
     @Override
     public void updateOffset(MessageQueue mq, long offset, boolean 
increaseOnly) {
         if (mq != null) {
-            AtomicLong offsetOld = this.offsetTable.get(mq);
+            ControllableOffset offsetOld = this.offsetTable.get(mq);
             if (null == offsetOld) {
-                offsetOld = this.offsetTable.putIfAbsent(mq, new 
AtomicLong(offset));
+                offsetOld = this.offsetTable.putIfAbsent(mq, new 
ControllableOffset(offset));
             }
 
             if (null != offsetOld) {
                 if (increaseOnly) {
-                    MixAll.compareAndIncreaseOnly(offsetOld, offset);
+                    offsetOld.update(offset, true);
                 } else {
-                    offsetOld.set(offset);
+                    offsetOld.update(offset);
                 }
             }
         }
     }
 
+    @Override
+    public void updateAndFreezeOffset(MessageQueue mq, long offset) {
+        if (mq != null) {
+            this.offsetTable.computeIfAbsent(mq, k -> new 
ControllableOffset(offset))
+                .updateAndFreeze(offset);
+        }
+    }
+
     @Override
     public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
         if (mq != null) {
             switch (type) {
                 case MEMORY_FIRST_THEN_STORE:
                 case READ_FROM_MEMORY: {
-                    AtomicLong offset = this.offsetTable.get(mq);
+                    ControllableOffset offset = this.offsetTable.get(mq);
                     if (offset != null) {
-                        return offset.get();
+                        return offset.getOffset();
                     } else if (ReadOffsetType.READ_FROM_MEMORY == type) {
                         return -1;
                     }
@@ -118,18 +125,18 @@ public class RemoteBrokerOffsetStore implements 
OffsetStore {
 
         final HashSet<MessageQueue> unusedMQ = new HashSet<>();
 
-        for (Map.Entry<MessageQueue, AtomicLong> entry : 
this.offsetTable.entrySet()) {
+        for (Map.Entry<MessageQueue, ControllableOffset> entry : 
this.offsetTable.entrySet()) {
             MessageQueue mq = entry.getKey();
-            AtomicLong offset = entry.getValue();
+            ControllableOffset offset = entry.getValue();
             if (offset != null) {
                 if (mqs.contains(mq)) {
                     try {
-                        this.updateConsumeOffsetToBroker(mq, offset.get());
+                        this.updateConsumeOffsetToBroker(mq, 
offset.getOffset());
                         log.info("[persistAll] Group: {} ClientId: {} 
updateConsumeOffsetToBroker {} {}",
                             this.groupName,
                             this.mQClientFactory.getClientId(),
                             mq,
-                            offset.get());
+                            offset.getOffset());
                     } catch (Exception e) {
                         log.error("updateConsumeOffsetToBroker exception, " + 
mq.toString(), e);
                     }
@@ -149,15 +156,15 @@ public class RemoteBrokerOffsetStore implements 
OffsetStore {
 
     @Override
     public void persist(MessageQueue mq) {
-        AtomicLong offset = this.offsetTable.get(mq);
+        ControllableOffset offset = this.offsetTable.get(mq);
         if (offset != null) {
             try {
-                this.updateConsumeOffsetToBroker(mq, offset.get());
+                this.updateConsumeOffsetToBroker(mq, offset.getOffset());
                 log.info("[persist] Group: {} ClientId: {} 
updateConsumeOffsetToBroker {} {}",
                     this.groupName,
                     this.mQClientFactory.getClientId(),
                     mq,
-                    offset.get());
+                    offset.getOffset());
             } catch (Exception e) {
                 log.error("updateConsumeOffsetToBroker exception, " + 
mq.toString(), e);
             }
@@ -175,12 +182,12 @@ public class RemoteBrokerOffsetStore implements 
OffsetStore {
     @Override
     public Map<MessageQueue, Long> cloneOffsetTable(String topic) {
         Map<MessageQueue, Long> cloneOffsetTable = new 
HashMap<>(this.offsetTable.size(), 1);
-        for (Map.Entry<MessageQueue, AtomicLong> entry : 
this.offsetTable.entrySet()) {
+        for (Map.Entry<MessageQueue, ControllableOffset> entry : 
this.offsetTable.entrySet()) {
             MessageQueue mq = entry.getKey();
             if (!UtilAll.isBlank(topic) && !topic.equals(mq.getTopic())) {
                 continue;
             }
-            cloneOffsetTable.put(mq, entry.getValue().get());
+            cloneOffsetTable.put(mq, entry.getValue().getOffset());
         }
         return cloneOffsetTable;
     }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index cfb89b5c88..d2a362ba56 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -404,16 +404,17 @@ public class DefaultMQPushConsumerImpl implements 
MQConsumerInner {
                             
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
 
                             pullRequest.getProcessQueue().setDropped(true);
-                            
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
+                            DefaultMQPushConsumerImpl.this.executeTask(new 
Runnable() {
 
                                 @Override
                                 public void run() {
                                     try {
-                                        
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
-                                            pullRequest.getNextOffset(), 
false);
+                                        
DefaultMQPushConsumerImpl.this.offsetStore.updateAndFreezeOffset(pullRequest.getMessageQueue(),
+                                            pullRequest.getNextOffset());
 
                                         
DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
 
+                                        // removeProcessQueue will also remove 
offset to cancel the frozen status.
                                         
DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
 
                                         log.warn("fix the pull request offset, 
{}", pullRequest);
@@ -421,7 +422,7 @@ public class DefaultMQPushConsumerImpl implements 
MQConsumerInner {
                                         log.error("executeTaskLater 
Exception", e);
                                     }
                                 }
-                            }, 10000);
+                            });
                             break;
                         default:
                             break;
@@ -705,6 +706,10 @@ public class DefaultMQPushConsumerImpl implements 
MQConsumerInner {
         this.mQClientFactory.getPullMessageService().executeTaskLater(r, 
timeDelay);
     }
 
+    public void executeTask(final Runnable r) {
+        this.mQClientFactory.getPullMessageService().executeTask(r);
+    }
+
     public QueryResult queryMessage(String topic, String key, int maxNum, long 
begin, long end)
         throws MQClientException, InterruptedException {
         return this.mQClientFactory.getMQAdminImpl().queryMessage(topic, key, 
maxNum, begin, end);
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
index b5e6f9f790..ec6ede6bde 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
@@ -90,6 +90,14 @@ public class PullMessageService extends ServiceThread {
         }
     }
 
+    public void executeTask(final Runnable r) {
+        if (!isStopped()) {
+            this.scheduledExecutorService.execute(r);
+        } else {
+            logger.warn("PullMessageServiceScheduledThread has shutdown");
+        }
+    }
+
     public ScheduledExecutorService getScheduledExecutorService() {
         return scheduledExecutorService;
     }
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
index 33ea2b04b8..ba6911e3e8 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
@@ -81,6 +81,38 @@ public class RemoteBrokerOffsetStoreTest {
         assertThat(offsetStore.readOffset(messageQueue, 
ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(1023);
     }
 
+    @Test
+    public void testUpdateAndFreezeOffset() throws Exception {
+        OffsetStore offsetStore = new RemoteBrokerOffsetStore(mQClientFactory, 
group);
+        MessageQueue messageQueue = new MessageQueue(topic, brokerName, 1);
+
+        offsetStore.updateAndFreezeOffset(messageQueue, 1024);
+        assertThat(offsetStore.readOffset(messageQueue, 
ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(1024);
+
+        offsetStore.updateOffset(messageQueue, 1023, false);
+        assertThat(offsetStore.readOffset(messageQueue, 
ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(1024);
+
+        offsetStore.updateOffset(messageQueue, 1022, true);
+        assertThat(offsetStore.readOffset(messageQueue, 
ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(1024);
+    }
+
+    @Test
+    public void testUpdateAndFreezeOffsetWithRemove() throws Exception {
+        OffsetStore offsetStore = new RemoteBrokerOffsetStore(mQClientFactory, 
group);
+        MessageQueue messageQueue = new MessageQueue(topic, brokerName, 1);
+
+        offsetStore.updateAndFreezeOffset(messageQueue, 1024);
+        assertThat(offsetStore.readOffset(messageQueue, 
ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(1024);
+
+        offsetStore.updateOffset(messageQueue, 1023, false);
+        assertThat(offsetStore.readOffset(messageQueue, 
ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(1024);
+
+        offsetStore.removeOffset(messageQueue);
+        assertThat(offsetStore.readOffset(messageQueue, 
ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(-1);
+        offsetStore.updateOffset(messageQueue, 1023, false);
+        assertThat(offsetStore.readOffset(messageQueue, 
ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(1023);
+    }
+
     @Test
     public void testReadOffset_WithException() throws Exception {
         OffsetStore offsetStore = new RemoteBrokerOffsetStore(mQClientFactory, 
group);

Reply via email to