This is an automated email from the ASF dual-hosted git repository. zhouxzhan pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new a376fbcdb8 [ISSUE #7634] Introduce controllableOffset to prevent unnecessary suspension when OFFSET_ILLEGAL (#7635) a376fbcdb8 is described below commit a376fbcdb82e818cfa239da677669f1118e4c40f Author: Zhouxiang Zhan <zhouxz...@apache.org> AuthorDate: Tue Dec 12 16:52:12 2023 +0800 [ISSUE #7634] Introduce controllableOffset to prevent unnecessary suspension when OFFSET_ILLEGAL (#7635) * Add controllableOffset to prevent unnecessary suspension when OFFSET_ILLEGAL --- .../client/consumer/store/ControllableOffset.java | 115 +++++++++++++++++++++ .../consumer/store/LocalFileOffsetStore.java | 33 +++--- .../client/consumer/store/OffsetStore.java | 8 ++ .../consumer/store/RemoteBrokerOffsetStore.java | 45 ++++---- .../impl/consumer/DefaultMQPushConsumerImpl.java | 13 ++- .../client/impl/consumer/PullMessageService.java | 8 ++ .../store/RemoteBrokerOffsetStoreTest.java | 32 ++++++ 7 files changed, 218 insertions(+), 36 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/ControllableOffset.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/ControllableOffset.java new file mode 100644 index 0000000000..9db4bd2e2a --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/ControllableOffset.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.consumer.store; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * The ControllableOffset class encapsulates a thread-safe offset value that can be + * updated atomically. Additionally, this class allows for the offset to be "frozen," + * which prevents further updates after the freeze operation has been performed. + * <p> + * Concurrency Scenarios: + * If {@code updateAndFreeze} is called before any {@code update} operations, it sets + * {@code allowToUpdate} to false and updates the offset to the target value specified. + * After this operation, further invocations of {@code update} will not affect the offset, + * as it is considered frozen. + * <p> + * If {@code update} is in progress while {@code updateAndFreeze} is invoked concurrently, + * the final outcome depends on the sequence of operations: + * 1. If {@code update}'s atomic update operation completes before {@code updateAndFreeze}, + * the latter will overwrite the offset and set {@code allowToUpdate} to false, + * preventing any further updates. + * 2. If {@code updateAndFreeze} executes before the {@code update} finalizes its operation, + * the ongoing {@code update} will not proceed with its changes. The {@link AtomicLong#getAndUpdate} + * method used in both operations ensures atomicity and respects the final state imposed by + * {@code updateAndFreeze}, even if the {@code update} function has already begun. + * <p> + * In essence, once the {@code updateAndFreeze} operation is executed, the offset value remains + * immutable to any subsequent {@code update} calls due to the immediate visibility of the + * {@code allowToUpdate} state change, courtesy of its volatile nature. + * <p> + * The combination of an AtomicLong for the offset value and a volatile boolean flag for update + * control provides a reliable mechanism for managing offset values in concurrent environments. + */ +public class ControllableOffset { + // Holds the current offset value in an atomic way. + private final AtomicLong value; + // Controls whether updates to the offset are allowed. + private volatile boolean allowToUpdate; + + public ControllableOffset(long value) { + this.value = new AtomicLong(value); + this.allowToUpdate = true; + } + + /** + * Attempts to update the offset to the target value. If increaseOnly is true, + * the offset will not be decreased. The update operation is atomic and thread-safe. + * The operation will respect the current allowToUpdate state, and if the offset + * has been frozen by a previous call to {@link #updateAndFreeze(long)}, + * this method will not update the offset. + * + * @param target the new target offset value. + * @param increaseOnly if true, the offset will only be updated if the target value + * is greater than the current value. + */ + public void update(long target, boolean increaseOnly) { + if (allowToUpdate) { + value.getAndUpdate(val -> { + if (allowToUpdate) { + if (increaseOnly) { + return Math.max(target, val); + } else { + return target; + } + } else { + return val; + } + }); + } + } + + /** + * Overloaded method for updating the offset value unconditionally. + * + * @param target The new target value for the offset. + */ + public void update(long target) { + update(target, false); + } + + /** + * Freezes the offset at the target value provided. Once frozen, the offset + * cannot be updated by subsequent calls to {@link #update(long, boolean)}. + * This method will set allowToUpdate to false and then update the offset, + * ensuring the new value is the final state of the offset. + * + * @param target the new target offset value to freeze at. + */ + public void updateAndFreeze(long target) { + value.getAndUpdate(val -> { + allowToUpdate = false; + return target; + }); + } + + public long getOffset() { + return value.get(); + } +} diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java index 832888dbeb..074508c46b 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java @@ -47,7 +47,7 @@ public class LocalFileOffsetStore implements OffsetStore { private final MQClientInstance mQClientFactory; private final String groupName; private final String storePath; - private ConcurrentMap<MessageQueue, AtomicLong> offsetTable = + private ConcurrentMap<MessageQueue, ControllableOffset> offsetTable = new ConcurrentHashMap<>(); public LocalFileOffsetStore(MQClientInstance mQClientFactory, String groupName) { @@ -63,10 +63,9 @@ public class LocalFileOffsetStore implements OffsetStore { public void load() throws MQClientException { OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset(); if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) { - offsetTable.putAll(offsetSerializeWrapper.getOffsetTable()); - for (Entry<MessageQueue, AtomicLong> mqEntry : offsetSerializeWrapper.getOffsetTable().entrySet()) { AtomicLong offset = mqEntry.getValue(); + offsetTable.put(mqEntry.getKey(), new ControllableOffset(offset.get())); log.info("load consumer's offset, {} {} {}", this.groupName, mqEntry.getKey(), @@ -78,30 +77,38 @@ public class LocalFileOffsetStore implements OffsetStore { @Override public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) { if (mq != null) { - AtomicLong offsetOld = this.offsetTable.get(mq); + ControllableOffset offsetOld = this.offsetTable.get(mq); if (null == offsetOld) { - offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset)); + offsetOld = this.offsetTable.putIfAbsent(mq, new ControllableOffset(offset)); } if (null != offsetOld) { if (increaseOnly) { - MixAll.compareAndIncreaseOnly(offsetOld, offset); + offsetOld.update(offset, true); } else { - offsetOld.set(offset); + offsetOld.update(offset); } } } } + @Override + public void updateAndFreezeOffset(MessageQueue mq, long offset) { + if (mq != null) { + this.offsetTable.computeIfAbsent(mq, k -> new ControllableOffset(offset)) + .updateAndFreeze(offset); + } + } + @Override public long readOffset(final MessageQueue mq, final ReadOffsetType type) { if (mq != null) { switch (type) { case MEMORY_FIRST_THEN_STORE: case READ_FROM_MEMORY: { - AtomicLong offset = this.offsetTable.get(mq); + ControllableOffset offset = this.offsetTable.get(mq); if (offset != null) { - return offset.get(); + return offset.getOffset(); } else if (ReadOffsetType.READ_FROM_MEMORY == type) { return -1; } @@ -135,9 +142,9 @@ public class LocalFileOffsetStore implements OffsetStore { return; OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper(); - for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) { + for (Map.Entry<MessageQueue, ControllableOffset> entry : this.offsetTable.entrySet()) { if (mqs.contains(entry.getKey())) { - AtomicLong offset = entry.getValue(); + AtomicLong offset = new AtomicLong(entry.getValue().getOffset()); offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset); } } @@ -170,12 +177,12 @@ public class LocalFileOffsetStore implements OffsetStore { @Override public Map<MessageQueue, Long> cloneOffsetTable(String topic) { Map<MessageQueue, Long> cloneOffsetTable = new HashMap<>(this.offsetTable.size(), 1); - for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) { + for (Map.Entry<MessageQueue, ControllableOffset> entry : this.offsetTable.entrySet()) { MessageQueue mq = entry.getKey(); if (!UtilAll.isBlank(topic) && !topic.equals(mq.getTopic())) { continue; } - cloneOffsetTable.put(mq, entry.getValue().get()); + cloneOffsetTable.put(mq, entry.getValue().getOffset()); } return cloneOffsetTable; diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java index 9deed0e3df..ecceedee17 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java @@ -37,6 +37,14 @@ public interface OffsetStore { */ void updateOffset(final MessageQueue mq, final long offset, final boolean increaseOnly); + /** + * Update and freeze the message queue to prevent concurrent update action + * + * @param mq target message queue + * @param offset expect update offset + */ + void updateAndFreezeOffset(final MessageQueue mq, final long offset); + /** * Get offset from local storage * diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java index 900e822114..83d5061adb 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java @@ -22,7 +22,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.OffsetNotFoundException; @@ -31,11 +30,11 @@ import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.protocol.header.QueryConsumerOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetRequestHeader; -import org.apache.rocketmq.logging.org.slf4j.Logger; -import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; /** * Remote storage implementation @@ -44,7 +43,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore { private final static Logger log = LoggerFactory.getLogger(RemoteBrokerOffsetStore.class); private final MQClientInstance mQClientFactory; private final String groupName; - private ConcurrentMap<MessageQueue, AtomicLong> offsetTable = + private ConcurrentMap<MessageQueue, ControllableOffset> offsetTable = new ConcurrentHashMap<>(); public RemoteBrokerOffsetStore(MQClientInstance mQClientFactory, String groupName) { @@ -59,30 +58,38 @@ public class RemoteBrokerOffsetStore implements OffsetStore { @Override public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) { if (mq != null) { - AtomicLong offsetOld = this.offsetTable.get(mq); + ControllableOffset offsetOld = this.offsetTable.get(mq); if (null == offsetOld) { - offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset)); + offsetOld = this.offsetTable.putIfAbsent(mq, new ControllableOffset(offset)); } if (null != offsetOld) { if (increaseOnly) { - MixAll.compareAndIncreaseOnly(offsetOld, offset); + offsetOld.update(offset, true); } else { - offsetOld.set(offset); + offsetOld.update(offset); } } } } + @Override + public void updateAndFreezeOffset(MessageQueue mq, long offset) { + if (mq != null) { + this.offsetTable.computeIfAbsent(mq, k -> new ControllableOffset(offset)) + .updateAndFreeze(offset); + } + } + @Override public long readOffset(final MessageQueue mq, final ReadOffsetType type) { if (mq != null) { switch (type) { case MEMORY_FIRST_THEN_STORE: case READ_FROM_MEMORY: { - AtomicLong offset = this.offsetTable.get(mq); + ControllableOffset offset = this.offsetTable.get(mq); if (offset != null) { - return offset.get(); + return offset.getOffset(); } else if (ReadOffsetType.READ_FROM_MEMORY == type) { return -1; } @@ -118,18 +125,18 @@ public class RemoteBrokerOffsetStore implements OffsetStore { final HashSet<MessageQueue> unusedMQ = new HashSet<>(); - for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) { + for (Map.Entry<MessageQueue, ControllableOffset> entry : this.offsetTable.entrySet()) { MessageQueue mq = entry.getKey(); - AtomicLong offset = entry.getValue(); + ControllableOffset offset = entry.getValue(); if (offset != null) { if (mqs.contains(mq)) { try { - this.updateConsumeOffsetToBroker(mq, offset.get()); + this.updateConsumeOffsetToBroker(mq, offset.getOffset()); log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}", this.groupName, this.mQClientFactory.getClientId(), mq, - offset.get()); + offset.getOffset()); } catch (Exception e) { log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e); } @@ -149,15 +156,15 @@ public class RemoteBrokerOffsetStore implements OffsetStore { @Override public void persist(MessageQueue mq) { - AtomicLong offset = this.offsetTable.get(mq); + ControllableOffset offset = this.offsetTable.get(mq); if (offset != null) { try { - this.updateConsumeOffsetToBroker(mq, offset.get()); + this.updateConsumeOffsetToBroker(mq, offset.getOffset()); log.info("[persist] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}", this.groupName, this.mQClientFactory.getClientId(), mq, - offset.get()); + offset.getOffset()); } catch (Exception e) { log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e); } @@ -175,12 +182,12 @@ public class RemoteBrokerOffsetStore implements OffsetStore { @Override public Map<MessageQueue, Long> cloneOffsetTable(String topic) { Map<MessageQueue, Long> cloneOffsetTable = new HashMap<>(this.offsetTable.size(), 1); - for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) { + for (Map.Entry<MessageQueue, ControllableOffset> entry : this.offsetTable.entrySet()) { MessageQueue mq = entry.getKey(); if (!UtilAll.isBlank(topic) && !topic.equals(mq.getTopic())) { continue; } - cloneOffsetTable.put(mq, entry.getValue().get()); + cloneOffsetTable.put(mq, entry.getValue().getOffset()); } return cloneOffsetTable; } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index cfb89b5c88..d2a362ba56 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -404,16 +404,17 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { pullRequest.setNextOffset(pullResult.getNextBeginOffset()); pullRequest.getProcessQueue().setDropped(true); - DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() { + DefaultMQPushConsumerImpl.this.executeTask(new Runnable() { @Override public void run() { try { - DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(), - pullRequest.getNextOffset(), false); + DefaultMQPushConsumerImpl.this.offsetStore.updateAndFreezeOffset(pullRequest.getMessageQueue(), + pullRequest.getNextOffset()); DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue()); + // removeProcessQueue will also remove offset to cancel the frozen status. DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue()); log.warn("fix the pull request offset, {}", pullRequest); @@ -421,7 +422,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { log.error("executeTaskLater Exception", e); } } - }, 10000); + }); break; default: break; @@ -705,6 +706,10 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { this.mQClientFactory.getPullMessageService().executeTaskLater(r, timeDelay); } + public void executeTask(final Runnable r) { + this.mQClientFactory.getPullMessageService().executeTask(r); + } + public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) throws MQClientException, InterruptedException { return this.mQClientFactory.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java index b5e6f9f790..ec6ede6bde 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java @@ -90,6 +90,14 @@ public class PullMessageService extends ServiceThread { } } + public void executeTask(final Runnable r) { + if (!isStopped()) { + this.scheduledExecutorService.execute(r); + } else { + logger.warn("PullMessageServiceScheduledThread has shutdown"); + } + } + public ScheduledExecutorService getScheduledExecutorService() { return scheduledExecutorService; } diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java index 33ea2b04b8..ba6911e3e8 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java @@ -81,6 +81,38 @@ public class RemoteBrokerOffsetStoreTest { assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(1023); } + @Test + public void testUpdateAndFreezeOffset() throws Exception { + OffsetStore offsetStore = new RemoteBrokerOffsetStore(mQClientFactory, group); + MessageQueue messageQueue = new MessageQueue(topic, brokerName, 1); + + offsetStore.updateAndFreezeOffset(messageQueue, 1024); + assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(1024); + + offsetStore.updateOffset(messageQueue, 1023, false); + assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(1024); + + offsetStore.updateOffset(messageQueue, 1022, true); + assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(1024); + } + + @Test + public void testUpdateAndFreezeOffsetWithRemove() throws Exception { + OffsetStore offsetStore = new RemoteBrokerOffsetStore(mQClientFactory, group); + MessageQueue messageQueue = new MessageQueue(topic, brokerName, 1); + + offsetStore.updateAndFreezeOffset(messageQueue, 1024); + assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(1024); + + offsetStore.updateOffset(messageQueue, 1023, false); + assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(1024); + + offsetStore.removeOffset(messageQueue); + assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(-1); + offsetStore.updateOffset(messageQueue, 1023, false); + assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(1023); + } + @Test public void testReadOffset_WithException() throws Exception { OffsetStore offsetStore = new RemoteBrokerOffsetStore(mQClientFactory, group);