This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new c4227b235 [ISSUE #5348] [RIP-48] Support server-side offset management 
in broadcast consumption mode (#5349)
c4227b235 is described below

commit c4227b23575747edeac690f9e964dadfbd428fc2
Author: lizhimins <[email protected]>
AuthorDate: Fri Oct 21 16:24:12 2022 +0800

    [ISSUE #5348] [RIP-48] Support server-side offset management in broadcast 
consumption mode (#5349)
    
    * Support server-side offset management in broadcast consumption mode
    * Fix unit test npe and and offset store test
    * Fix fast encode decode test
    
    Co-authored-by: 斜阳 <[email protected]>
---
 .../apache/rocketmq/broker/BrokerController.java   |  15 ++
 .../broker/offset/BroadcastOffsetManager.java      | 242 +++++++++++++++++++++
 .../broker/offset/BroadcastOffsetStore.java        |  55 +++++
 .../processor/DefaultPullMessageResultHandler.java |   4 +
 .../broker/processor/PullMessageProcessor.java     |  85 +++++++-
 .../broker/offset/BroadcastOffsetManagerTest.java  | 163 ++++++++++++++
 .../broker/offset/BroadcastOffsetStoreTest.java    |  31 +++
 .../broker/processor/PullMessageProcessorTest.java |  22 ++
 .../org/apache/rocketmq/common/BrokerConfig.java   |  30 +++
 .../rocketmq/common/protocol/RequestSource.java    |  40 ++++
 .../protocol/header/PullMessageRequestHeader.java  |  41 ++++
 11 files changed, 726 insertions(+), 2 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 717a08021..657234e26 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -66,6 +66,7 @@ import 
org.apache.rocketmq.broker.longpolling.PullRequestHoldService;
 import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
 import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
 import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
+import org.apache.rocketmq.broker.offset.BroadcastOffsetManager;
 import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
 import org.apache.rocketmq.broker.offset.ConsumerOrderInfoManager;
 import org.apache.rocketmq.broker.offset.LmqConsumerOffsetManager;
@@ -168,6 +169,7 @@ public class BrokerController {
     private final NettyClientConfig nettyClientConfig;
     protected final MessageStoreConfig messageStoreConfig;
     protected final ConsumerOffsetManager consumerOffsetManager;
+    protected final BroadcastOffsetManager broadcastOffsetManager;
     protected final ConsumerManager consumerManager;
     protected final ConsumerFilterManager consumerFilterManager;
     protected final ConsumerOrderInfoManager consumerOrderInfoManager;
@@ -296,6 +298,7 @@ public class BrokerController {
         this.setStoreHost(new 
InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), getListenPort()));
         this.brokerStatsManager = messageStoreConfig.isEnableLmq() ? new 
LmqBrokerStatsManager(this.brokerConfig.getBrokerClusterName(), 
this.brokerConfig.isEnableDetailStat()) : new 
BrokerStatsManager(this.brokerConfig.getBrokerClusterName(), 
this.brokerConfig.isEnableDetailStat());
         this.consumerOffsetManager = messageStoreConfig.isEnableLmq() ? new 
LmqConsumerOffsetManager(this) : new ConsumerOffsetManager(this);
+        this.broadcastOffsetManager = new BroadcastOffsetManager(this);
         this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new 
LmqTopicConfigManager(this) : new TopicConfigManager(this);
         this.topicQueueMappingManager = new TopicQueueMappingManager(this);
         this.pullMessageProcessor = new PullMessageProcessor(this);
@@ -1170,6 +1173,10 @@ public class BrokerController {
         return consumerOffsetManager;
     }
 
+    public BroadcastOffsetManager getBroadcastOffsetManager() {
+        return broadcastOffsetManager;
+    }
+
     public MessageStoreConfig getMessageStoreConfig() {
         return messageStoreConfig;
     }
@@ -1277,6 +1284,10 @@ public class BrokerController {
             this.fileWatchService.shutdown();
         }
 
+        if (this.broadcastOffsetManager != null) {
+            this.broadcastOffsetManager.shutdown();
+        }
+
         if (this.messageStore != null) {
             this.messageStore.shutdown();
         }
@@ -1503,6 +1514,10 @@ public class BrokerController {
             this.brokerFastFailure.start();
         }
 
+        if (this.broadcastOffsetManager != null) {
+            this.broadcastOffsetManager.start();
+        }
+
         if (this.escapeBridge != null) {
             this.escapeBridge.start();
         }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/BroadcastOffsetManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/BroadcastOffsetManager.java
new file mode 100644
index 000000000..16e70eed2
--- /dev/null
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/BroadcastOffsetManager.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.offset;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.ServiceThread;
+
+/**
+ * manage the offset of broadcast.
+ * now, use this to support switch remoting client between proxy and broker
+ */
+public class BroadcastOffsetManager extends ServiceThread {
+    private static final String TOPIC_GROUP_SEPARATOR = "@";
+    private final BrokerController brokerController;
+    private final BrokerConfig brokerConfig;
+
+    /**
+     * k: topic@groupId
+     * v: the pull offset of all client of all queue
+     */
+    protected final ConcurrentHashMap<String /* topic@groupId */, 
BroadcastOffsetData> offsetStoreMap =
+        new ConcurrentHashMap<>();
+
+    public BroadcastOffsetManager(BrokerController brokerController) {
+        this.brokerController = brokerController;
+        this.brokerConfig = brokerController.getBrokerConfig();
+    }
+
+    public void updateOffset(String topic, String group, int queueId, long 
offset, String clientId, boolean fromProxy) {
+        BroadcastOffsetData broadcastOffsetData = 
offsetStoreMap.computeIfAbsent(
+            buildKey(topic, group), key -> new BroadcastOffsetData(topic, 
group));
+
+        broadcastOffsetData.clientOffsetStore.compute(clientId, (clientIdKey, 
broadcastTimedOffsetStore) -> {
+            if (broadcastTimedOffsetStore == null) {
+                broadcastTimedOffsetStore = new 
BroadcastTimedOffsetStore(fromProxy);
+            }
+
+            broadcastTimedOffsetStore.timestamp = System.currentTimeMillis();
+            broadcastTimedOffsetStore.fromProxy = fromProxy;
+            broadcastTimedOffsetStore.offsetStore.updateOffset(queueId, 
offset, true);
+            return broadcastTimedOffsetStore;
+        });
+    }
+
+    /**
+     * the time need init offset
+     * 1. client connect to proxy -> client connect to broker
+     * 2. client connect to broker -> client connect to proxy
+     * 3. client connect to proxy at the first time
+     *
+     * @return -1 means no init offset, use the queueOffset in 
pullRequestHeader
+     */
+    public Long queryInitOffset(String topic, String groupId, int queueId, 
String clientId, long requestOffset,
+        boolean fromProxy) {
+
+        BroadcastOffsetData broadcastOffsetData = 
offsetStoreMap.get(buildKey(topic, groupId));
+        if (broadcastOffsetData == null) {
+            if (fromProxy && requestOffset < 0) {
+                return getOffset(null, topic, groupId, queueId);
+            } else {
+                return -1L;
+            }
+        }
+
+        final AtomicLong offset = new AtomicLong(-1L);
+        broadcastOffsetData.clientOffsetStore.compute(clientId, (clientIdK, 
offsetStore) -> {
+            if (offsetStore == null) {
+                offsetStore = new BroadcastTimedOffsetStore(fromProxy);
+            }
+
+            if (offsetStore.fromProxy && requestOffset < 0) {
+                // when from proxy and requestOffset is -1
+                // means proxy need a init offset to pull message
+                offset.set(getOffset(offsetStore, topic, groupId, queueId));
+                return offsetStore;
+            }
+
+            if (offsetStore.fromProxy == fromProxy) {
+                return offsetStore;
+            }
+
+            offset.set(getOffset(offsetStore, topic, groupId, queueId));
+            return offsetStore;
+        });
+        return offset.get();
+    }
+
+    private long getOffset(BroadcastTimedOffsetStore offsetStore, String 
topic, String groupId, int queueId) {
+        long storeOffset = -1;
+        if (offsetStore != null) {
+            storeOffset = offsetStore.offsetStore.readOffset(queueId);
+        }
+        if (storeOffset < 0) {
+            storeOffset =
+                
brokerController.getConsumerOffsetManager().queryOffset(broadcastGroupId(groupId),
 topic, queueId);
+        }
+        if (storeOffset < 0) {
+            if 
(!this.brokerController.getMessageStore().checkInDiskByConsumeOffset(topic, 
queueId, 0)) {
+                storeOffset = 0;
+            } else {
+                storeOffset = 
brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId, true);
+            }
+        }
+        return storeOffset;
+    }
+
+    /**
+     * 1. scan expire offset
+     * 2. calculate the min offset of all client of one topic@group,
+     * and then commit consumer offset by group@broadcast
+     */
+    protected void scanOffsetData() {
+        for (String k : offsetStoreMap.keySet()) {
+            BroadcastOffsetData broadcastOffsetData = offsetStoreMap.get(k);
+            if (broadcastOffsetData == null) {
+                continue;
+            }
+
+            Map<Integer, Long> queueMinOffset = new HashMap<>();
+
+            for (String clientId : 
broadcastOffsetData.clientOffsetStore.keySet()) {
+                broadcastOffsetData.clientOffsetStore
+                    .computeIfPresent(clientId, (clientIdKey, 
broadcastTimedOffsetStore) -> {
+                        long interval = System.currentTimeMillis() - 
broadcastTimedOffsetStore.timestamp;
+                        boolean clientIsOnline = 
brokerController.getConsumerManager().findChannel(broadcastOffsetData.group, 
clientId) != null;
+                        if (clientIsOnline || interval < 
Duration.ofSeconds(brokerConfig.getBroadcastOffsetExpireSecond()).toMillis()) {
+                            Set<Integer> queueSet = 
broadcastTimedOffsetStore.offsetStore.queueList();
+                            for (Integer queue : queueSet) {
+                                long offset = 
broadcastTimedOffsetStore.offsetStore.readOffset(queue);
+                                offset = 
Math.min(queueMinOffset.getOrDefault(queue, offset), offset);
+                                queueMinOffset.put(queue, offset);
+                            }
+                        }
+                        if (clientIsOnline && interval >= 
Duration.ofSeconds(brokerConfig.getBroadcastOffsetExpireMaxSecond()).toMillis())
 {
+                            return null;
+                        }
+                        if (!clientIsOnline && interval >= 
Duration.ofSeconds(brokerConfig.getBroadcastOffsetExpireSecond()).toMillis()) {
+                            return null;
+                        }
+                        return broadcastTimedOffsetStore;
+                    });
+            }
+
+            offsetStoreMap.computeIfPresent(k, (key, broadcastOffsetDataVal) 
-> {
+                if (broadcastOffsetDataVal.clientOffsetStore.isEmpty()) {
+                    return null;
+                }
+                return broadcastOffsetDataVal;
+            });
+
+            queueMinOffset.forEach((queueId, offset) ->
+                
this.brokerController.getConsumerOffsetManager().commitOffset("BroadcastOffset",
+                broadcastGroupId(broadcastOffsetData.group), 
broadcastOffsetData.topic, queueId, offset));
+        }
+    }
+
+    private String buildKey(String topic, String group) {
+        return topic + TOPIC_GROUP_SEPARATOR + group;
+    }
+
+    /**
+     * @param group group of users
+     * @return the groupId used to commit offset
+     */
+    private static String broadcastGroupId(String group) {
+        return group + TOPIC_GROUP_SEPARATOR + "broadcast";
+    }
+
+    @Override
+    public String getServiceName() {
+        return "BroadcastOffsetManager";
+    }
+
+    @Override
+    public void run() {
+        while (!this.isStopped()) {
+            this.waitForRunning(Duration.ofSeconds(5).toMillis());
+        }
+    }
+
+    @Override
+    protected void onWaitEnd() {
+        this.scanOffsetData();
+    }
+
+    public static class BroadcastOffsetData {
+        private final String topic;
+        private final String group;
+        private final ConcurrentHashMap<String /* clientId */, 
BroadcastTimedOffsetStore> clientOffsetStore;
+
+        public BroadcastOffsetData(String topic, String group) {
+            this.topic = topic;
+            this.group = group;
+            this.clientOffsetStore = new ConcurrentHashMap<>();
+        }
+    }
+
+    public static class BroadcastTimedOffsetStore {
+
+        /**
+         * the timeStamp of last update occurred
+         */
+        private volatile long timestamp;
+
+        /**
+         * mark the offset of this client is updated by proxy or not
+         */
+        private volatile boolean fromProxy;
+
+        /**
+         * the pulled offset of each queue
+         */
+        private final BroadcastOffsetStore offsetStore;
+
+        public BroadcastTimedOffsetStore(boolean fromProxy) {
+            this.timestamp = System.currentTimeMillis();
+            this.fromProxy = fromProxy;
+            this.offsetStore = new BroadcastOffsetStore();
+        }
+    }
+}
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/BroadcastOffsetStore.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/BroadcastOffsetStore.java
new file mode 100644
index 000000000..3770e576a
--- /dev/null
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/BroadcastOffsetStore.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.offset;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.common.MixAll;
+
+public class BroadcastOffsetStore {
+
+    private final ConcurrentMap<Integer, AtomicLong> offsetTable = new 
ConcurrentHashMap<>();
+
+    public void updateOffset(int queueId, long offset, boolean increaseOnly) {
+        AtomicLong offsetOld = this.offsetTable.get(queueId);
+        if (null == offsetOld) {
+            offsetOld = this.offsetTable.putIfAbsent(queueId, new 
AtomicLong(offset));
+        }
+
+        if (null != offsetOld) {
+            if (increaseOnly) {
+                MixAll.compareAndIncreaseOnly(offsetOld, offset);
+            } else {
+                offsetOld.set(offset);
+            }
+        }
+    }
+
+    public long readOffset(int queueId) {
+        AtomicLong offset = this.offsetTable.get(queueId);
+        if (offset != null) {
+            return offset.get();
+        }
+        return -1L;
+    }
+
+    public Set<Integer> queueList() {
+        return offsetTable.keySet();
+    }
+}
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
index ac6fa88bc..2d15139d4 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
@@ -71,6 +71,10 @@ public class DefaultPullMessageResultHandler implements 
PullMessageResultHandler
                                   final MessageFilter messageFilter,
                                   RemotingCommand response) {
 
+        PullMessageProcessor processor = 
brokerController.getPullMessageProcessor();
+        processor.updateBroadcastPulledOffset(requestHeader.getTopic(), 
requestHeader.getConsumerGroup(),
+            requestHeader.getQueueId(), requestHeader, channel, response, 
getMessageResult.getNextBeginOffset());
+
         final PullMessageResponseHeader responseHeader = 
(PullMessageResponseHeader) response.readCustomHeader();
 
         switch (response.getCode()) {
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 700ce55d7..e3a818953 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -22,7 +22,9 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
+import java.util.Objects;
 import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.client.ClientChannelInfo;
 import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
 import org.apache.rocketmq.broker.filter.ConsumerFilterData;
 import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
@@ -42,9 +44,11 @@ import org.apache.rocketmq.common.help.FAQUrl;
 import org.apache.rocketmq.common.protocol.ForbiddenType;
 import org.apache.rocketmq.common.protocol.NamespaceUtil;
 import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.RequestSource;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.rpc.RpcClientUtils;
@@ -480,8 +484,15 @@ public class PullMessageProcessor implements 
NettyRequestProcessor {
             
getMessageResult.setMaxOffset(messageStore.getMaxOffsetInQueue(topic, queueId));
             getMessageResult.setSuggestPullingFromSlave(false);
         } else {
-            getMessageResult = messageStore.getMessage(
-                group, topic, queueId, requestHeader.getQueueOffset(), 
requestHeader.getMaxMsgNums(), messageFilter);
+            long broadcastInitOffset = queryBroadcastPullInitOffset(topic, 
group, queueId, requestHeader, channel);
+            if (broadcastInitOffset >= 0) {
+                getMessageResult = new GetMessageResult();
+                getMessageResult.setStatus(GetMessageStatus.OFFSET_RESET);
+                getMessageResult.setNextBeginOffset(broadcastInitOffset);
+            } else {
+                getMessageResult = messageStore.getMessage(
+                    group, topic, queueId, requestHeader.getQueueOffset(), 
requestHeader.getMaxMsgNums(), messageFilter);
+            }
         }
 
         if (getMessageResult != null) {
@@ -736,4 +747,74 @@ public class PullMessageProcessor implements 
NettyRequestProcessor {
     public void setPullMessageResultHandler(PullMessageResultHandler 
pullMessageResultHandler) {
         this.pullMessageResultHandler = pullMessageResultHandler;
     }
+
+    private boolean isBroadcast(boolean proxyPullBroadcast, ConsumerGroupInfo 
consumerGroupInfo) {
+        return proxyPullBroadcast ||
+            consumerGroupInfo != null
+                && 
MessageModel.BROADCASTING.equals(consumerGroupInfo.getMessageModel())
+                && 
ConsumeType.CONSUME_PASSIVELY.equals(consumerGroupInfo.getConsumeType());
+    }
+
+    protected void updateBroadcastPulledOffset(String topic, String group, int 
queueId,
+        PullMessageRequestHeader requestHeader, Channel channel, 
RemotingCommand response, long nextBeginOffset) {
+
+        if (response == null || 
!this.brokerController.getBrokerConfig().isEnableBroadcastOffsetStore()) {
+            return;
+        }
+
+        boolean proxyPullBroadcast = Objects.equals(
+            RequestSource.PROXY_FOR_BROADCAST.getValue(), 
requestHeader.getRequestSource());
+        ConsumerGroupInfo consumerGroupInfo = 
this.brokerController.getConsumerManager().getConsumerGroupInfo(group);
+
+        if (isBroadcast(proxyPullBroadcast, consumerGroupInfo)) {
+            long offset = requestHeader.getQueueOffset();
+            if (ResponseCode.PULL_OFFSET_MOVED == response.getCode()) {
+                offset = nextBeginOffset;
+            }
+            String clientId;
+            if (proxyPullBroadcast) {
+                clientId = requestHeader.getProxyFrowardClientId();
+            } else {
+                ClientChannelInfo clientChannelInfo = 
consumerGroupInfo.findChannel(channel);
+                if (clientChannelInfo == null) {
+                    return;
+                }
+                clientId = clientChannelInfo.getClientId();
+            }
+            this.brokerController.getBroadcastOffsetManager()
+                .updateOffset(topic, group, queueId, offset, clientId, 
proxyPullBroadcast);
+        }
+    }
+
+    /**
+     * When pull request is not broadcast or not return -1
+     */
+    protected long queryBroadcastPullInitOffset(String topic, String group, 
int queueId,
+        PullMessageRequestHeader requestHeader, Channel channel) {
+
+        if 
(!this.brokerController.getBrokerConfig().isEnableBroadcastOffsetStore()) {
+            return -1L;
+        }
+
+        ConsumerGroupInfo consumerGroupInfo = 
this.brokerController.getConsumerManager().getConsumerGroupInfo(group);
+        boolean proxyPullBroadcast = Objects.equals(
+            RequestSource.PROXY_FOR_BROADCAST.getValue(), 
requestHeader.getRequestSource());
+
+        if (isBroadcast(proxyPullBroadcast, consumerGroupInfo)) {
+            String clientId;
+            if (proxyPullBroadcast) {
+                clientId = requestHeader.getProxyFrowardClientId();
+            } else {
+                ClientChannelInfo clientChannelInfo = 
consumerGroupInfo.findChannel(channel);
+                if (clientChannelInfo == null) {
+                    return -1;
+                }
+                clientId = clientChannelInfo.getClientId();
+            }
+
+            return this.brokerController.getBroadcastOffsetManager()
+                .queryInitOffset(topic, group, queueId, clientId, 
requestHeader.getQueueOffset(), proxyPullBroadcast);
+        }
+        return -1L;
+    }
 }
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/offset/BroadcastOffsetManagerTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/offset/BroadcastOffsetManagerTest.java
new file mode 100644
index 000000000..9dc00f9d6
--- /dev/null
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/offset/BroadcastOffsetManagerTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.offset;
+
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.client.ClientChannelInfo;
+import org.apache.rocketmq.broker.client.ConsumerManager;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.store.MessageStore;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.stubbing.Answer;
+
+import static org.awaitility.Awaitility.await;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class BroadcastOffsetManagerTest {
+
+    private final AtomicLong maxOffset = new AtomicLong(10L);
+    private final AtomicLong commitOffset = new AtomicLong(-1);
+
+    private final ConsumerOffsetManager consumerOffsetManager = 
mock(ConsumerOffsetManager.class);
+    private final ConsumerManager consumerManager = 
mock(ConsumerManager.class);
+    private final BrokerConfig brokerConfig = new BrokerConfig();
+    private final Set<String> onlineClientIdSet = new HashSet<>();
+    private BroadcastOffsetManager broadcastOffsetManager;
+
+    @Before
+    public void before() {
+        brokerConfig.setEnableBroadcastOffsetStore(true);
+        brokerConfig.setBroadcastOffsetExpireSecond(1);
+        brokerConfig.setBroadcastOffsetExpireMaxSecond(5);
+        BrokerController brokerController = mock(BrokerController.class);
+        when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
+
+        
when(brokerController.getConsumerManager()).thenReturn(consumerManager);
+        doAnswer((Answer<ClientChannelInfo>) mock -> {
+            String clientId = mock.getArgument(1);
+            if (onlineClientIdSet.contains(clientId)) {
+                return new ClientChannelInfo(null);
+            }
+            return null;
+        }).when(consumerManager).findChannel(anyString(), anyString());
+
+        doAnswer((Answer<Long>) mock -> commitOffset.get())
+            .when(consumerOffsetManager).queryOffset(anyString(), anyString(), 
anyInt());
+        doAnswer((Answer<Void>) mock -> {
+            commitOffset.set(mock.getArgument(4));
+            return null;
+        }).when(consumerOffsetManager).commitOffset(anyString(), anyString(), 
anyString(), anyInt(), anyLong());
+        
when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);
+
+        MessageStore messageStore = mock(MessageStore.class);
+        doAnswer((Answer<Long>) mock -> maxOffset.get())
+            .when(messageStore).getMaxOffsetInQueue(anyString(), anyInt(), 
anyBoolean());
+        when(brokerController.getMessageStore()).thenReturn(messageStore);
+
+        broadcastOffsetManager = new BroadcastOffsetManager(brokerController);
+    }
+
+    @Test
+    public void testBroadcastOffsetSwitch() {
+        // client1 connect to broker
+        onlineClientIdSet.add("client1");
+        long offset = broadcastOffsetManager.queryInitOffset("group", "topic", 
0, "client1", 0, false);
+        Assert.assertEquals(-1, offset);
+        broadcastOffsetManager.updateOffset("group", "topic", 0, 10, 
"client1", false);
+        offset = broadcastOffsetManager.queryInitOffset("group", "topic", 0, 
"client1", 11, false);
+        Assert.assertEquals(-1, offset);
+        broadcastOffsetManager.updateOffset("group", "topic", 0, 11, 
"client1", false);
+
+        // client1 connect to proxy
+        offset = broadcastOffsetManager.queryInitOffset("group", "topic", 0, 
"client1", -1, true);
+        Assert.assertEquals(11, offset);
+        broadcastOffsetManager.updateOffset("group", "topic", 0, 11, 
"client1", true);
+        offset = broadcastOffsetManager.queryInitOffset("group", "topic", 0, 
"client1", 11, true);
+        Assert.assertEquals(-1, offset);
+        broadcastOffsetManager.updateOffset("group", "topic", 0, 12, 
"client1", true);
+
+        broadcastOffsetManager.scanOffsetData();
+        Assert.assertEquals(12L, commitOffset.get());
+
+        // client2 connect to proxy
+        onlineClientIdSet.add("client2");
+        offset = broadcastOffsetManager.queryInitOffset("group", "topic", 0, 
"client2", -1, true);
+        Assert.assertEquals(12, offset);
+        broadcastOffsetManager.updateOffset("group", "topic", 0, 12, 
"client2", true);
+        offset = broadcastOffsetManager.queryInitOffset("group", "topic", 0, 
"client2", 11, true);
+        Assert.assertEquals(-1, offset);
+        broadcastOffsetManager.updateOffset("group", "topic", 0, 13, 
"client2", true);
+
+        broadcastOffsetManager.scanOffsetData();
+        Assert.assertEquals(12L, commitOffset.get());
+
+        // client1 connect to broker
+        offset = broadcastOffsetManager.queryInitOffset("group", "topic", 0, 
"client1", 20, false);
+        Assert.assertEquals(12, offset);
+        broadcastOffsetManager.updateOffset("group", "topic", 0, 12, 
"client1", false);
+        offset = broadcastOffsetManager.queryInitOffset("group", "topic", 0, 
"client1", 12, false);
+        Assert.assertEquals(-1, offset);
+
+        onlineClientIdSet.clear();
+
+        maxOffset.set(30L);
+
+        // client3 connect to broker
+        onlineClientIdSet.add("client3");
+        offset = broadcastOffsetManager.queryInitOffset("group", "topic", 0, 
"client3", 30, false);
+        Assert.assertEquals(-1, offset);
+        broadcastOffsetManager.updateOffset("group", "topic", 0, 30, 
"client3", false);
+
+        
await().atMost(Duration.ofSeconds(brokerConfig.getBroadcastOffsetExpireSecond() 
+ 1)).until(() -> {
+            broadcastOffsetManager.scanOffsetData();
+            return commitOffset.get() == 30L;
+        });
+    }
+
+    @Test
+    public void testBroadcastOffsetExpire() {
+        onlineClientIdSet.add("client1");
+        broadcastOffsetManager.updateOffset(
+            "group", "topic", 0, 10, "client1", false);
+        onlineClientIdSet.clear();
+
+        
await().atMost(Duration.ofSeconds(brokerConfig.getBroadcastOffsetExpireSecond() 
+ 1)).until(() -> {
+            broadcastOffsetManager.scanOffsetData();
+            return broadcastOffsetManager.offsetStoreMap.isEmpty();
+        });
+
+        onlineClientIdSet.add("client1");
+        broadcastOffsetManager.updateOffset(
+            "group", "topic", 0, 10, "client1", false);
+        
await().atMost(Duration.ofSeconds(brokerConfig.getBroadcastOffsetExpireMaxSecond()
 + 1)).until(() -> {
+            broadcastOffsetManager.scanOffsetData();
+            return broadcastOffsetManager.offsetStoreMap.isEmpty();
+        });
+    }
+}
\ No newline at end of file
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/offset/BroadcastOffsetStoreTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/offset/BroadcastOffsetStoreTest.java
new file mode 100644
index 000000000..ef830b9e9
--- /dev/null
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/offset/BroadcastOffsetStoreTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.offset;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class BroadcastOffsetStoreTest {
+
+    @Test
+    public void testBasicOffsetStore() {
+        BroadcastOffsetStore offsetStore = new BroadcastOffsetStore();
+        offsetStore.updateOffset(0, 100L, false);
+        offsetStore.updateOffset(1, 200L, false);
+        Assert.assertEquals(100L, offsetStore.readOffset(0));
+    }
+}
\ No newline at end of file
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
index e20acb0cf..2398fee87 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
@@ -18,8 +18,10 @@ package org.apache.rocketmq.broker.processor;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
+import java.lang.reflect.Method;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.client.ClientChannelInfo;
+import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
 import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
 import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
 import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
@@ -42,6 +44,7 @@ import org.apache.rocketmq.store.GetMessageResult;
 import org.apache.rocketmq.store.GetMessageStatus;
 import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -206,6 +209,25 @@ public class PullMessageProcessorTest {
         assertThat(response.getCode()).isEqualTo(ResponseCode.NO_PERMISSION);
     }
 
+    @Test
+    public void testIfBroadcast() throws Exception {
+        Class<? extends PullMessageProcessor> clazz = 
pullMessageProcessor.getClass();
+        Method method = clazz.getDeclaredMethod("isBroadcast", boolean.class, 
ConsumerGroupInfo.class);
+        method.setAccessible(true);
+
+        ConsumerGroupInfo consumerGroupInfo = new ConsumerGroupInfo("GID-1",
+            ConsumeType.CONSUME_PASSIVELY, MessageModel.CLUSTERING, 
ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+        Assert.assertTrue((Boolean) method.invoke(pullMessageProcessor, true, 
consumerGroupInfo));
+
+        ConsumerGroupInfo consumerGroupInfo2 = new ConsumerGroupInfo("GID-2",
+            ConsumeType.CONSUME_ACTIVELY, MessageModel.BROADCASTING, 
ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+        Assert.assertFalse((Boolean) method.invoke(pullMessageProcessor, 
false, consumerGroupInfo2));
+
+        ConsumerGroupInfo consumerGroupInfo3 = new ConsumerGroupInfo("GID-3",
+            ConsumeType.CONSUME_PASSIVELY, MessageModel.BROADCASTING, 
ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+        Assert.assertTrue((Boolean) method.invoke(pullMessageProcessor, false, 
consumerGroupInfo3));
+    }
+
     private RemotingCommand createPullMsgCommand(int requestCode) {
         PullMessageRequestHeader requestHeader = new 
PullMessageRequestHeader();
         requestHeader.setCommitOffset(123L);
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java 
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index f39741e26..677b87ff5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -202,6 +202,12 @@ public class BrokerConfig extends BrokerIdentity {
 
     private boolean enableNetWorkFlowControl = false;
 
+    private boolean enableBroadcastOffsetStore = true;
+
+    private long broadcastOffsetExpireSecond = 2 * 60;
+
+    private long broadcastOffsetExpireMaxSecond = 5 * 60;
+
     private int popPollingSize = 1024;
     private int popPollingMapSize = 100000;
     // 20w cost 200M heap memory.
@@ -1421,6 +1427,30 @@ public class BrokerConfig extends BrokerIdentity {
         this.useServerSideResetOffset = useServerSideResetOffset;
     }
 
+    public boolean isEnableBroadcastOffsetStore() {
+        return enableBroadcastOffsetStore;
+    }
+
+    public void setEnableBroadcastOffsetStore(boolean 
enableBroadcastOffsetStore) {
+        this.enableBroadcastOffsetStore = enableBroadcastOffsetStore;
+    }
+
+    public long getBroadcastOffsetExpireSecond() {
+        return broadcastOffsetExpireSecond;
+    }
+
+    public void setBroadcastOffsetExpireSecond(long 
broadcastOffsetExpireSecond) {
+        this.broadcastOffsetExpireSecond = broadcastOffsetExpireSecond;
+    }
+
+    public long getBroadcastOffsetExpireMaxSecond() {
+        return broadcastOffsetExpireMaxSecond;
+    }
+
+    public void setBroadcastOffsetExpireMaxSecond(long 
broadcastOffsetExpireMaxSecond) {
+        this.broadcastOffsetExpireMaxSecond = broadcastOffsetExpireMaxSecond;
+    }
+
     public MetricsExporterType getMetricsExporterType() {
         return metricsExporterType;
     }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestSource.java 
b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestSource.java
new file mode 100644
index 000000000..ebe61c2aa
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestSource.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol;
+
+public enum RequestSource {
+
+    SDK(-1),
+    PROXY_FOR_ORDER(0),
+    PROXY_FOR_BROADCAST(1),
+    PROXY_FOR_STREAM(2);
+
+    public static final String SYSTEM_PROPERTY_KEY = "rocketmq.requestSource";
+    private final int value;
+
+    RequestSource(int value) {
+        this.value = value;
+    }
+
+    public int getValue() {
+        return value;
+    }
+
+    public static boolean isValid(Integer value) {
+        return null != value && value >= -1 && value < 
RequestSource.values().length - 1;
+    }
+}
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
index 317dc5f4e..751cb8ea3 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
@@ -31,6 +31,7 @@ import org.apache.rocketmq.remoting.protocol.FastCodesHeader;
 import io.netty.buffer.ByteBuf;
 
 public class PullMessageRequestHeader extends TopicQueueRequestHeader 
implements FastCodesHeader {
+
     @CFNotNull
     private String consumerGroup;
     @CFNotNull
@@ -56,6 +57,16 @@ public class PullMessageRequestHeader extends 
TopicQueueRequestHeader implements
     @CFNullable
     private Integer maxMsgBytes;
 
+    /**
+     * mark the source of this pull request
+     */
+    private Integer requestSource;
+
+    /**
+     * the real clientId when request from proxy
+     */
+    private String proxyFrowardClientId;
+
     @Override
     public void checkFields() throws RemotingCommandException {
     }
@@ -74,6 +85,8 @@ public class PullMessageRequestHeader extends 
TopicQueueRequestHeader implements
         writeIfNotNull(out, "subVersion", subVersion);
         writeIfNotNull(out, "expressionType", expressionType);
         writeIfNotNull(out, "maxMsgBytes", maxMsgBytes);
+        writeIfNotNull(out, "requestSource", requestSource);
+        writeIfNotNull(out, "proxyFrowardClientId", proxyFrowardClientId);
         writeIfNotNull(out, "lo", lo);
         writeIfNotNull(out, "ns", ns);
         writeIfNotNull(out, "nsd", nsd);
@@ -143,6 +156,16 @@ public class PullMessageRequestHeader extends 
TopicQueueRequestHeader implements
             this.maxMsgBytes = Integer.parseInt(str);
         }
 
+        str = fields.get("requestSource");
+        if (str != null) {
+            this.requestSource = Integer.parseInt(str);
+        }
+
+        str = fields.get("proxyFrowardClientId");
+        if (str != null) {
+            this.proxyFrowardClientId = str;
+        }
+
         str = fields.get("lo");
         if (str != null) {
             this.lo = Boolean.parseBoolean(str);
@@ -269,6 +292,22 @@ public class PullMessageRequestHeader extends 
TopicQueueRequestHeader implements
         this.maxMsgBytes = maxMsgBytes;
     }
 
+    public Integer getRequestSource() {
+        return requestSource;
+    }
+
+    public void setRequestSource(Integer requestSource) {
+        this.requestSource = requestSource;
+    }
+
+    public String getProxyFrowardClientId() {
+        return proxyFrowardClientId;
+    }
+
+    public void setProxyFrowardClientId(String proxyFrowardClientId) {
+        this.proxyFrowardClientId = proxyFrowardClientId;
+    }
+
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(this)
@@ -284,6 +323,8 @@ public class PullMessageRequestHeader extends 
TopicQueueRequestHeader implements
             .add("subscription", subscription)
             .add("subVersion", subVersion)
             .add("expressionType", expressionType)
+            .add("requestSource", requestSource)
+            .add("proxyFrowardClientId", proxyFrowardClientId)
             .toString();
     }
 }


Reply via email to