This is an automated email from the ASF dual-hosted git repository. nicholasjiang pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
The following commit(s) were added to refs/heads/main by this push: new b686501 [ISSUE #95] Fix source can not consume new queue's messages when topic queue expansion (#96) b686501 is described below commit b6865013b7a54883727a2b4cd712087d6e8a6d4d Author: Humkum <1109939...@qq.com> AuthorDate: Wed Jan 10 17:29:24 2024 +0800 [ISSUE #95] Fix source can not consume new queue's messages when topic queue expansion (#96) --- .../connector/rocketmq/legacy/RocketMQSourceFunction.java | 14 +++++++++++++- .../legacy/sourceFunction/RocketMQSourceFunctionTest.java | 14 +++++++++++++- 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java b/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java index e445643..bedf97f 100644 --- a/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java +++ b/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java @@ -62,6 +62,7 @@ import org.slf4j.LoggerFactory; import java.lang.management.ManagementFactory; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -536,7 +537,7 @@ public class RocketMQSourceFunction<OUT> extends RichParallelSourceFunction<OUT> } } - public void initOffsetTableFromRestoredOffsets(List<MessageQueue> messageQueues) { + public void initOffsetTableFromRestoredOffsets(List<MessageQueue> messageQueues) throws MQClientException { Preconditions.checkNotNull(restoredOffsets, "restoredOffsets can't be null"); restoredOffsets.forEach( (mq, offset) -> { @@ -544,6 +545,17 @@ public class RocketMQSourceFunction<OUT> extends RichParallelSourceFunction<OUT> offsetTable.put(mq, offset); } }); + + List<MessageQueue> extMessageQueue = new ArrayList<>(); + for (MessageQueue messageQueue : messageQueues) { + if (!offsetTable.containsKey(messageQueue)) { + extMessageQueue.add(messageQueue); + } + } + if (extMessageQueue.size() != 0) { + log.info("no restoredOffsets for {}, so init offset for these queues", extMessageQueue); + initOffsets(extMessageQueue); + } log.info("init offset table [{}] from restoredOffsets successful.", offsetTable); } diff --git a/src/test/java/org/apache/flink/connector/rocketmq/legacy/sourceFunction/RocketMQSourceFunctionTest.java b/src/test/java/org/apache/flink/connector/rocketmq/legacy/sourceFunction/RocketMQSourceFunctionTest.java index cd514cd..08371b3 100644 --- a/src/test/java/org/apache/flink/connector/rocketmq/legacy/sourceFunction/RocketMQSourceFunctionTest.java +++ b/src/test/java/org/apache/flink/connector/rocketmq/legacy/sourceFunction/RocketMQSourceFunctionTest.java @@ -25,12 +25,16 @@ import org.apache.flink.connector.rocketmq.legacy.common.config.StartupMode; import org.apache.flink.connector.rocketmq.legacy.common.serialization.SimpleStringDeserializationSchema; import org.apache.flink.connector.rocketmq.legacy.common.util.TestUtils; +import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.common.message.MessageQueue; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; @@ -69,6 +73,8 @@ public class RocketMQSourceFunctionTest { @Test public void testRestartFromCheckpoint() throws Exception { + DefaultLitePullConsumer consumer = Mockito.mock(DefaultLitePullConsumer.class); + Mockito.when(consumer.committed(Mockito.any())).thenReturn(40L); Properties properties = new Properties(); properties.setProperty(RocketMQConfig.CONSUMER_GROUP, "${ConsumerGroup}"); properties.setProperty(RocketMQConfig.CONSUMER_TOPIC, "${SourceTopic}"); @@ -82,13 +88,19 @@ public class RocketMQSourceFunctionTest { map.put(new MessageQueue("tpc", "broker-0", 1), 21L); map.put(new MessageQueue("tpc", "broker-1", 0), 30L); map.put(new MessageQueue("tpc", "broker-1", 1), 31L); + List<MessageQueue> allocateMessageQueues = new ArrayList<>(map.keySet()); + MessageQueue newMessageQueue = new MessageQueue("tpc", "broker-2", 0); + allocateMessageQueues.add(newMessageQueue); + TestUtils.setFieldValue(source, "messageQueues", allocateMessageQueues); + TestUtils.setFieldValue(source, "consumer", consumer); TestUtils.setFieldValue(source, "restoredOffsets", map); TestUtils.setFieldValue(source, "offsetTable", new ConcurrentHashMap<>()); - source.initOffsetTableFromRestoredOffsets(new ArrayList<>(map.keySet())); + source.initOffsetTableFromRestoredOffsets(allocateMessageQueues); Map<MessageQueue, Long> offsetTable = (Map) TestUtils.getFieldValue(source, "offsetTable"); for (Map.Entry<MessageQueue, Long> entry : map.entrySet()) { assertEquals(offsetTable.containsKey(entry.getKey()), true); assertEquals(offsetTable.containsValue(entry.getValue()), true); } + assertEquals(offsetTable.containsKey(newMessageQueue), true); } }