This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git


The following commit(s) were added to refs/heads/main by this push:
     new b686501  [ISSUE #95] Fix source can not consume new queue's messages 
when topic queue expansion (#96)
b686501 is described below

commit b6865013b7a54883727a2b4cd712087d6e8a6d4d
Author: Humkum <1109939...@qq.com>
AuthorDate: Wed Jan 10 17:29:24 2024 +0800

    [ISSUE #95] Fix source can not consume new queue's messages when topic 
queue expansion (#96)
---
 .../connector/rocketmq/legacy/RocketMQSourceFunction.java  | 14 +++++++++++++-
 .../legacy/sourceFunction/RocketMQSourceFunctionTest.java  | 14 +++++++++++++-
 2 files changed, 26 insertions(+), 2 deletions(-)

diff --git 
a/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java
 
b/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java
index e445643..bedf97f 100644
--- 
a/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java
+++ 
b/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java
@@ -62,6 +62,7 @@ import org.slf4j.LoggerFactory;
 
 import java.lang.management.ManagementFactory;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -536,7 +537,7 @@ public class RocketMQSourceFunction<OUT> extends 
RichParallelSourceFunction<OUT>
         }
     }
 
-    public void initOffsetTableFromRestoredOffsets(List<MessageQueue> 
messageQueues) {
+    public void initOffsetTableFromRestoredOffsets(List<MessageQueue> 
messageQueues) throws MQClientException {
         Preconditions.checkNotNull(restoredOffsets, "restoredOffsets can't be 
null");
         restoredOffsets.forEach(
                 (mq, offset) -> {
@@ -544,6 +545,17 @@ public class RocketMQSourceFunction<OUT> extends 
RichParallelSourceFunction<OUT>
                         offsetTable.put(mq, offset);
                     }
                 });
+
+        List<MessageQueue> extMessageQueue = new ArrayList<>();
+        for (MessageQueue messageQueue : messageQueues) {
+            if (!offsetTable.containsKey(messageQueue)) {
+                extMessageQueue.add(messageQueue);
+            }
+        }
+        if (extMessageQueue.size() != 0) {
+            log.info("no restoredOffsets for {}, so init offset for these 
queues", extMessageQueue);
+            initOffsets(extMessageQueue);
+        }
         log.info("init offset table [{}] from restoredOffsets successful.", 
offsetTable);
     }
 
diff --git 
a/src/test/java/org/apache/flink/connector/rocketmq/legacy/sourceFunction/RocketMQSourceFunctionTest.java
 
b/src/test/java/org/apache/flink/connector/rocketmq/legacy/sourceFunction/RocketMQSourceFunctionTest.java
index cd514cd..08371b3 100644
--- 
a/src/test/java/org/apache/flink/connector/rocketmq/legacy/sourceFunction/RocketMQSourceFunctionTest.java
+++ 
b/src/test/java/org/apache/flink/connector/rocketmq/legacy/sourceFunction/RocketMQSourceFunctionTest.java
@@ -25,12 +25,16 @@ import 
org.apache.flink.connector.rocketmq.legacy.common.config.StartupMode;
 import 
org.apache.flink.connector.rocketmq.legacy.common.serialization.SimpleStringDeserializationSchema;
 import org.apache.flink.connector.rocketmq.legacy.common.util.TestUtils;
 
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
@@ -69,6 +73,8 @@ public class RocketMQSourceFunctionTest {
 
     @Test
     public void testRestartFromCheckpoint() throws Exception {
+        DefaultLitePullConsumer consumer = 
Mockito.mock(DefaultLitePullConsumer.class);
+        Mockito.when(consumer.committed(Mockito.any())).thenReturn(40L);
         Properties properties = new Properties();
         properties.setProperty(RocketMQConfig.CONSUMER_GROUP, 
"${ConsumerGroup}");
         properties.setProperty(RocketMQConfig.CONSUMER_TOPIC, 
"${SourceTopic}");
@@ -82,13 +88,19 @@ public class RocketMQSourceFunctionTest {
         map.put(new MessageQueue("tpc", "broker-0", 1), 21L);
         map.put(new MessageQueue("tpc", "broker-1", 0), 30L);
         map.put(new MessageQueue("tpc", "broker-1", 1), 31L);
+        List<MessageQueue> allocateMessageQueues = new 
ArrayList<>(map.keySet());
+        MessageQueue newMessageQueue = new MessageQueue("tpc", "broker-2", 0);
+        allocateMessageQueues.add(newMessageQueue);
+        TestUtils.setFieldValue(source, "messageQueues", 
allocateMessageQueues);
+        TestUtils.setFieldValue(source, "consumer", consumer);
         TestUtils.setFieldValue(source, "restoredOffsets", map);
         TestUtils.setFieldValue(source, "offsetTable", new 
ConcurrentHashMap<>());
-        source.initOffsetTableFromRestoredOffsets(new 
ArrayList<>(map.keySet()));
+        source.initOffsetTableFromRestoredOffsets(allocateMessageQueues);
         Map<MessageQueue, Long> offsetTable = (Map) 
TestUtils.getFieldValue(source, "offsetTable");
         for (Map.Entry<MessageQueue, Long> entry : map.entrySet()) {
             assertEquals(offsetTable.containsKey(entry.getKey()), true);
             assertEquals(offsetTable.containsValue(entry.getValue()), true);
         }
+        assertEquals(offsetTable.containsKey(newMessageQueue), true);
     }
 }

Reply via email to