This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git


The following commit(s) were added to refs/heads/main by this push:
     new fce246b  [RIP-74] Implement dynamic load balancing in 
Flink-Connector-RocketMQ (#126)
fce246b is described below

commit fce246bda5d2a17edd6d793f3b6cb54deb18872d
Author: hqbfz <125714719+3424672...@users.noreply.github.com>
AuthorDate: Thu Mar 6 17:18:02 2025 +0800

    [RIP-74] Implement dynamic load balancing in Flink-Connector-RocketMQ (#126)
---
 .../rocketmq/common/event/SourceCheckEvent.java}   |  27 +-
 .../rocketmq/common/event/SourceDetectEvent.java}  |  25 +-
 .../common/event/SourceInitAssignEvent.java}       |  25 +-
 .../common/event/SourceReportOffsetEvent.java      |  60 ++++
 .../connector/rocketmq/common/lock/SpinLock.java   |  35 ++
 .../rocketmq/legacy/RocketMQSourceFunction.java    |   3 +-
 .../rocketmq/source/InnerConsumerImpl.java         |   3 +-
 .../connector/rocketmq/source/RocketMQSource.java  |  18 +-
 .../RocketMQSourceEnumStateSerializer.java         |   3 +-
 .../enumerator/RocketMQSourceEnumerator.java       | 355 +++++++++++++++++++--
 .../enumerator/allocate/AllocateStrategy.java      |  13 +
 .../allocate/AllocateStrategyFactory.java          |   3 +
 .../allocate/AverageAllocateStrategy.java          |  43 +++
 .../allocate/BroadcastAllocateStrategy.java        |   6 +
 .../allocate/ConsistentHashAllocateStrategy.java   |   6 +
 .../metrics/RocketMQSourceReaderMetrics.java       |   2 +
 .../reader/RocketMQSourceFetcherManager.java       |   4 +
 .../source/reader/RocketMQSourceReader.java        |  99 +++++-
 .../source/reader/RocketMQSplitReader.java         | 101 ++++--
 .../split/RocketMQPartitionSplitSerializer.java    |   8 +-
 .../rocketmq/source/split/RocketMQSourceSplit.java |  63 +++-
 .../source/split/RocketMQSourceSplitState.java     |   7 +-
 .../sourceFunction/RocketMQSourceFunctionTest.java |   1 -
 .../allocate/AverageAllocateStrategyTest.java      |  97 ++++++
 .../RocketMQPartitionSplitSerializerTest.java      |  52 +++
 25 files changed, 952 insertions(+), 107 deletions(-)

diff --git 
a/src/test/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializerTest.java
 
b/src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceCheckEvent.java
similarity index 52%
copy from 
src/test/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializerTest.java
copy to 
src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceCheckEvent.java
index f1fa5af..70a7206 100644
--- 
a/src/test/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializerTest.java
+++ 
b/src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceCheckEvent.java
@@ -16,24 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.rocketmq.source.split;
+package org.apache.flink.connector.rocketmq.common.event;
 
-import org.junit.Test;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.java.tuple.Tuple2;
 
-import java.io.IOException;
+import org.apache.rocketmq.common.message.MessageQueue;
 
-import static org.junit.Assert.assertEquals;
+import java.util.Map;
 
-/** Test for {@link RocketMQPartitionSplitSerializer}. */
-public class RocketMQPartitionSplitSerializerTest {
+public class SourceCheckEvent implements SourceEvent {
+    private Map<MessageQueue, Tuple2<Long, Long>> assignedMq;
 
-    @Test
-    public void testSerializePartitionSplit() throws IOException {
-        RocketMQPartitionSplitSerializer serializer = new 
RocketMQPartitionSplitSerializer();
-        RocketMQSourceSplit expected =
-                new RocketMQSourceSplit("test-split-serialization", 
"taobaodaily", 256, 100, 300);
-        RocketMQSourceSplit actual =
-                serializer.deserialize(serializer.getVersion(), 
serializer.serialize(expected));
-        assertEquals(expected, actual);
+    public Map<MessageQueue, Tuple2<Long, Long>> getAssignedMq() {
+        return assignedMq;
+    }
+
+    public void setAssignedMq(Map<MessageQueue, Tuple2<Long, Long>> 
assignedMq) {
+        this.assignedMq = assignedMq;
     }
 }
diff --git 
a/src/test/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializerTest.java
 
b/src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceDetectEvent.java
similarity index 52%
copy from 
src/test/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializerTest.java
copy to 
src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceDetectEvent.java
index f1fa5af..af8c4e3 100644
--- 
a/src/test/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializerTest.java
+++ 
b/src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceDetectEvent.java
@@ -16,24 +16,19 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.rocketmq.source.split;
+package org.apache.flink.connector.rocketmq.common.event;
 
-import org.junit.Test;
+import org.apache.flink.api.connector.source.SourceEvent;
 
-import java.io.IOException;
+public class SourceDetectEvent implements SourceEvent {
+    // Request to resend the initial allocation result
+    private boolean reSendInitAssign = true;
 
-import static org.junit.Assert.assertEquals;
-
-/** Test for {@link RocketMQPartitionSplitSerializer}. */
-public class RocketMQPartitionSplitSerializerTest {
+    public boolean getReSendInitAssign() {
+        return reSendInitAssign;
+    }
 
-    @Test
-    public void testSerializePartitionSplit() throws IOException {
-        RocketMQPartitionSplitSerializer serializer = new 
RocketMQPartitionSplitSerializer();
-        RocketMQSourceSplit expected =
-                new RocketMQSourceSplit("test-split-serialization", 
"taobaodaily", 256, 100, 300);
-        RocketMQSourceSplit actual =
-                serializer.deserialize(serializer.getVersion(), 
serializer.serialize(expected));
-        assertEquals(expected, actual);
+    public void setReSendInitAssign(boolean reSendInitAssign) {
+        this.reSendInitAssign = reSendInitAssign;
     }
 }
diff --git 
a/src/test/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializerTest.java
 
b/src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceInitAssignEvent.java
similarity index 52%
copy from 
src/test/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializerTest.java
copy to 
src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceInitAssignEvent.java
index f1fa5af..347ec51 100644
--- 
a/src/test/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializerTest.java
+++ 
b/src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceInitAssignEvent.java
@@ -16,24 +16,21 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.rocketmq.source.split;
+package org.apache.flink.connector.rocketmq.common.event;
 
-import org.junit.Test;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.connector.rocketmq.source.split.RocketMQSourceSplit;
 
-import java.io.IOException;
+import java.util.List;
 
-import static org.junit.Assert.assertEquals;
+public class SourceInitAssignEvent implements SourceEvent {
+    private List<RocketMQSourceSplit> splits;
 
-/** Test for {@link RocketMQPartitionSplitSerializer}. */
-public class RocketMQPartitionSplitSerializerTest {
+    public void setSplits(List<RocketMQSourceSplit> splits) {
+        this.splits = splits;
+    }
 
-    @Test
-    public void testSerializePartitionSplit() throws IOException {
-        RocketMQPartitionSplitSerializer serializer = new 
RocketMQPartitionSplitSerializer();
-        RocketMQSourceSplit expected =
-                new RocketMQSourceSplit("test-split-serialization", 
"taobaodaily", 256, 100, 300);
-        RocketMQSourceSplit actual =
-                serializer.deserialize(serializer.getVersion(), 
serializer.serialize(expected));
-        assertEquals(expected, actual);
+    public List<RocketMQSourceSplit> getSplits() {
+        return splits;
     }
 }
diff --git 
a/src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceReportOffsetEvent.java
 
b/src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceReportOffsetEvent.java
new file mode 100644
index 0000000..d850256
--- /dev/null
+++ 
b/src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceReportOffsetEvent.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rocketmq.common.event;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+
+public class SourceReportOffsetEvent implements SourceEvent {
+    private String topic;
+    private String broker;
+    private int queueId;
+    private long checkpoint = -1;
+
+    public void setBroker(String broker) {
+        this.broker = broker;
+    }
+
+    public void setCheckpoint(long checkpoint) {
+        this.checkpoint = checkpoint;
+    }
+
+    public void setQueueId(int queueId) {
+        this.queueId = queueId;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public long getCheckpoint() {
+        return checkpoint;
+    }
+
+    public int getQueueId() {
+        return queueId;
+    }
+
+    public String getBroker() {
+        return broker;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+}
diff --git 
a/src/main/java/org/apache/flink/connector/rocketmq/common/lock/SpinLock.java 
b/src/main/java/org/apache/flink/connector/rocketmq/common/lock/SpinLock.java
new file mode 100644
index 0000000..dbb32c2
--- /dev/null
+++ 
b/src/main/java/org/apache/flink/connector/rocketmq/common/lock/SpinLock.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rocketmq.common.lock;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class SpinLock {
+    private AtomicBoolean lock = new AtomicBoolean(false);
+
+    public void lock() {
+        boolean lock = false;
+        do {
+            lock = this.lock.compareAndSet(false, true);
+        } while (!lock);
+    }
+
+    public void unlock() {
+        this.lock.set(false);
+    }
+}
diff --git 
a/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java
 
b/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java
index bedf97f..91d1d39 100644
--- 
a/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java
+++ 
b/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java
@@ -537,7 +537,8 @@ public class RocketMQSourceFunction<OUT> extends 
RichParallelSourceFunction<OUT>
         }
     }
 
-    public void initOffsetTableFromRestoredOffsets(List<MessageQueue> 
messageQueues) throws MQClientException {
+    public void initOffsetTableFromRestoredOffsets(List<MessageQueue> 
messageQueues)
+            throws MQClientException {
         Preconditions.checkNotNull(restoredOffsets, "restoredOffsets can't be 
null");
         restoredOffsets.forEach(
                 (mq, offset) -> {
diff --git 
a/src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java
 
b/src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java
index 317031f..a76a198 100644
--- 
a/src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java
+++ 
b/src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.connector.rocketmq.source;
 
-import com.alibaba.fastjson.JSON;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.connector.rocketmq.source.enumerator.offset.OffsetsSelector;
@@ -26,6 +25,8 @@ import 
org.apache.flink.connector.rocketmq.source.reader.MessageViewExt;
 import org.apache.flink.connector.rocketmq.source.util.UtilAll;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.StringUtils;
+
+import com.alibaba.fastjson.JSON;
 import org.apache.rocketmq.acl.common.AclClientRPCHook;
 import org.apache.rocketmq.acl.common.SessionCredentials;
 import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
diff --git 
a/src/main/java/org/apache/flink/connector/rocketmq/source/RocketMQSource.java 
b/src/main/java/org/apache/flink/connector/rocketmq/source/RocketMQSource.java
index 25fd8c7..cfdc0a1 100644
--- 
a/src/main/java/org/apache/flink/connector/rocketmq/source/RocketMQSource.java
+++ 
b/src/main/java/org/apache/flink/connector/rocketmq/source/RocketMQSource.java
@@ -124,13 +124,14 @@ public class RocketMQSource<OUT>
         final RocketMQSourceReaderMetrics rocketMQSourceReaderMetrics =
                 new RocketMQSourceReaderMetrics(readerContext.metricGroup());
 
-        Supplier<SplitReader<MessageView, RocketMQSourceSplit>> 
splitReaderSupplier =
-                () ->
-                        new RocketMQSplitReader<>(
-                                configuration,
-                                readerContext,
-                                deserializationSchema,
-                                rocketMQSourceReaderMetrics);
+        // unique reader
+        RocketMQSplitReader<OUT> reader =
+                new RocketMQSplitReader<>(
+                        configuration,
+                        readerContext,
+                        deserializationSchema,
+                        rocketMQSourceReaderMetrics);
+        Supplier<SplitReader<MessageView, RocketMQSourceSplit>> 
splitReaderSupplier = () -> reader;
 
         RocketMQSourceFetcherManager rocketmqSourceFetcherManager =
                 new RocketMQSourceFetcherManager(
@@ -145,7 +146,8 @@ public class RocketMQSource<OUT>
                 recordEmitter,
                 configuration,
                 readerContext,
-                rocketMQSourceReaderMetrics);
+                rocketMQSourceReaderMetrics,
+                splitReaderSupplier);
     }
 
     @Override
diff --git 
a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumStateSerializer.java
 
b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumStateSerializer.java
index 805df1b..960e622 100644
--- 
a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumStateSerializer.java
+++ 
b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumStateSerializer.java
@@ -18,8 +18,9 @@
 
 package org.apache.flink.connector.rocketmq.source.enumerator;
 
-import com.alibaba.fastjson.JSON;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import com.alibaba.fastjson.JSON;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java
 
b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java
index 77c0c33..c228309 100644
--- 
a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java
+++ 
b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java
@@ -18,16 +18,21 @@
 
 package org.apache.flink.connector.rocketmq.source.enumerator;
 
-import com.alibaba.fastjson.JSON;
-import com.google.common.collect.Sets;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.rocketmq.common.event.SourceCheckEvent;
+import org.apache.flink.connector.rocketmq.common.event.SourceDetectEvent;
+import org.apache.flink.connector.rocketmq.common.event.SourceInitAssignEvent;
+import 
org.apache.flink.connector.rocketmq.common.event.SourceReportOffsetEvent;
+import org.apache.flink.connector.rocketmq.common.lock.SpinLock;
 import org.apache.flink.connector.rocketmq.source.InnerConsumer;
 import org.apache.flink.connector.rocketmq.source.InnerConsumerImpl;
 import org.apache.flink.connector.rocketmq.source.RocketMQSourceOptions;
@@ -36,18 +41,26 @@ import 
org.apache.flink.connector.rocketmq.source.enumerator.allocate.AllocateSt
 import 
org.apache.flink.connector.rocketmq.source.enumerator.offset.OffsetsSelector;
 import org.apache.flink.connector.rocketmq.source.split.RocketMQSourceSplit;
 import org.apache.flink.util.FlinkRuntimeException;
+
+import com.alibaba.fastjson.JSON;
+import com.google.common.collect.Sets;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -69,18 +82,31 @@ public class RocketMQSourceEnumerator
     private final OffsetsSelector startingOffsetsSelector;
     private final OffsetsSelector stoppingOffsetsSelector;
 
+    // Used for queue dynamic allocation
+    private final Map<MessageQueue, Long> checkedOffsets;
+    private boolean[] initTask;
+
     // The internal states of the enumerator.
     // This set is only accessed by the partition discovery callable in the 
callAsync() method.
     // The current assignment by reader id. Only accessed by the coordinator 
thread.
     // The discovered and initialized partition splits that are waiting for 
owner reader to be
     // ready.
-    private final Set<MessageQueue> allocatedSet;
+    private final Map<MessageQueue, Byte> allocatedSet;
     private final Map<Integer, Set<RocketMQSourceSplit>> 
pendingSplitAssignmentMap;
 
+    // Only Maintaining mapping relationship
+    private final Map<Integer, Set<RocketMQSourceSplit>> assignedMap;
+    private final Map<MessageQueue, Integer /* taskId */> 
reflectedQueueToTaskId;
+
     // Param from configuration
     private final String groupId;
     private final long partitionDiscoveryIntervalMs;
 
+    // Indicates the number of allocated queues
+    private int partitionId;
+    private final SpinLock lock;
+    private ScheduledExecutorService scheduledExecutorService;
+
     public RocketMQSourceEnumerator(
             OffsetsSelector startingOffsetsSelector,
             OffsetsSelector stoppingOffsetsSelector,
@@ -107,13 +133,19 @@ public class RocketMQSourceEnumerator
         this.configuration = configuration;
         this.context = context;
         this.boundedness = boundedness;
+        this.lock = new SpinLock();
 
         // Support allocate splits to reader
+        this.checkedOffsets = new ConcurrentHashMap<>();
+        this.reflectedQueueToTaskId = new ConcurrentHashMap<>();
         this.pendingSplitAssignmentMap = new ConcurrentHashMap<>();
-        this.allocatedSet = new HashSet<>(currentSplitAssignment);
+        this.allocatedSet = new ConcurrentHashMap<>();
+        this.assignedMap = new ConcurrentHashMap<>();
         this.allocateStrategy =
                 AllocateStrategyFactory.getStrategy(
-                        configuration, context, new 
RocketMQSourceEnumState(allocatedSet));
+                        configuration,
+                        context,
+                        new RocketMQSourceEnumState(currentSplitAssignment));
 
         // For rocketmq setting
         this.groupId = 
configuration.getString(RocketMQSourceOptions.CONSUMER_GROUP);
@@ -121,12 +153,23 @@ public class RocketMQSourceEnumerator
         this.stoppingOffsetsSelector = stoppingOffsetsSelector;
         this.partitionDiscoveryIntervalMs =
                 
configuration.getLong(RocketMQSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS);
+        this.scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor();
+
+        // Initialize the task status
+        log.info(
+                "Starting the RocketMQSourceEnumerator with current split 
assignment: {}",
+                currentSplitAssignment);
+        if (!currentSplitAssignment.isEmpty()) {
+            this.initTask = new boolean[context.currentParallelism()];
+        }
     }
 
     @Override
     public void start() {
         consumer = new InnerConsumerImpl(configuration);
         consumer.start();
+        scheduledExecutorService.scheduleAtFixedRate(
+                this::notifyAssignResult, 30 * 1000, 30 * 1000, 
TimeUnit.MILLISECONDS);
 
         if (partitionDiscoveryIntervalMs > 0) {
             log.info(
@@ -190,7 +233,7 @@ public class RocketMQSourceEnumerator
 
     @Override
     public RocketMQSourceEnumState snapshotState(long checkpointId) {
-        return new RocketMQSourceEnumState(allocatedSet);
+        return new RocketMQSourceEnumState(allocatedSet.keySet());
     }
 
     @Override
@@ -205,9 +248,32 @@ public class RocketMQSourceEnumerator
         }
     }
 
+    @Override
+    public void handleSourceEvent(int taskId, SourceEvent sourceEvent) {
+        if (sourceEvent instanceof SourceReportOffsetEvent) {
+            handleOffsetEvent(taskId, (SourceReportOffsetEvent) sourceEvent);
+        } else if (sourceEvent instanceof SourceInitAssignEvent) {
+            handleInitAssignEvent(taskId, (SourceInitAssignEvent) sourceEvent);
+        }
+    }
+
     // ----------------- private methods -------------------
 
     private Set<MessageQueue> requestServiceDiscovery() {
+        // Ensure all subtasks have been initialized
+        try {
+            if (initTask != null) {
+                for (int i = 0; i < context.currentParallelism(); i++) {
+                    if (!initTask[i]) {
+                        context.sendEventToSourceReader(i, new 
SourceDetectEvent());
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log.error("init request resend error, please check task has 
started");
+            return null;
+        }
+
         Set<String> topicSet =
                 Sets.newHashSet(
                         configuration
@@ -235,6 +301,9 @@ public class RocketMQSourceEnumerator
         if (t != null) {
             throw new FlinkRuntimeException("Failed to handle source splits 
change due to ", t);
         }
+        if (latestSet == null) {
+            return;
+        }
 
         final SourceChangeResult sourceChangeResult = 
getSourceChangeResult(latestSet);
         if (sourceChangeResult.isEmpty()) {
@@ -248,30 +317,59 @@ public class RocketMQSourceEnumerator
 
     // This method should only be invoked in the coordinator executor thread.
     private SourceSplitChangeResult initializeSourceSplits(SourceChangeResult 
sourceChangeResult) {
+        lock.lock();
+
         Set<MessageQueue> increaseSet = sourceChangeResult.getIncreaseSet();
+        Set<MessageQueue> decreaseSet = sourceChangeResult.getDecreaseSet();
 
         OffsetsSelector.MessageQueueOffsetsRetriever offsetsRetriever =
                 new InnerConsumerImpl.RemotingOffsetsRetrieverImpl(consumer);
 
-        Map<MessageQueue, Long> startingOffsets =
+        Map<MessageQueue, Long> increaseStartingOffsets =
                 startingOffsetsSelector.getMessageQueueOffsets(increaseSet, 
offsetsRetriever);
-        Map<MessageQueue, Long> stoppingOffsets =
+        Map<MessageQueue, Long> increaseStoppingOffsets =
                 stoppingOffsetsSelector.getMessageQueueOffsets(increaseSet, 
offsetsRetriever);
+        Map<MessageQueue, Long> decreaseStoppingOffsets =
+                stoppingOffsetsSelector.getMessageQueueOffsets(decreaseSet, 
offsetsRetriever);
+        Map<MessageQueue, Long> decreaseStartingOffsets =
+                startingOffsetsSelector.getMessageQueueOffsets(decreaseSet, 
offsetsRetriever);
 
         Set<RocketMQSourceSplit> increaseSplitSet =
                 increaseSet.stream()
                         .map(
                                 mq -> {
-                                    long startingOffset = 
startingOffsets.get(mq);
+                                    long startingOffset = 
increaseStartingOffsets.get(mq);
                                     long stoppingOffset =
-                                            stoppingOffsets.getOrDefault(
+                                            
increaseStoppingOffsets.getOrDefault(
                                                     mq, 
RocketMQSourceSplit.NO_STOPPING_OFFSET);
                                     return new RocketMQSourceSplit(
                                             mq, startingOffset, 
stoppingOffset);
                                 })
                         .collect(Collectors.toSet());
+        // Update cache
+        increaseSet.forEach(
+                mq ->
+                        checkedOffsets.put(
+                                mq,
+                                increaseStartingOffsets.getOrDefault(
+                                        mq, 
RocketMQSourceSplit.NO_STOPPING_OFFSET)));
+
+        Set<RocketMQSourceSplit> decreaseSplitSet =
+                decreaseSet.stream()
+                        .map(
+                                mq -> {
+                                    long startingOffset = 
decreaseStartingOffsets.get(mq);
+                                    long stoppingOffset =
+                                            
decreaseStoppingOffsets.getOrDefault(
+                                                    mq, 
RocketMQSourceSplit.NO_STOPPING_OFFSET);
+                                    allocatedSet.remove(mq);
+                                    checkedOffsets.remove(mq);
+                                    return new RocketMQSourceSplit(
+                                            mq, startingOffset, 
stoppingOffset, false);
+                                })
+                        .collect(Collectors.toSet());
 
-        return new SourceSplitChangeResult(increaseSplitSet, 
sourceChangeResult.getDecreaseSet());
+        return new SourceSplitChangeResult(increaseSplitSet, decreaseSplitSet);
     }
 
     /**
@@ -291,15 +389,145 @@ public class RocketMQSourceEnumerator
         if (partitionDiscoveryIntervalMs <= 0) {
             log.info("Split changes, but dynamic partition discovery is 
disabled.");
         }
-        this.calculateSplitAssignment(sourceSplitChangeResult);
-        this.sendSplitChangesToRemote(context.registeredReaders().keySet());
+        try {
+            this.calculateSplitAssignment(sourceSplitChangeResult);
+            
this.sendSplitChangesToRemote(context.registeredReaders().keySet());
+        } finally {
+            lock.unlock();
+        }
     }
 
     /** Calculate new split assignment according allocate strategy */
     private void calculateSplitAssignment(SourceSplitChangeResult 
sourceSplitChangeResult) {
-        Map<Integer, Set<RocketMQSourceSplit>> newSourceSplitAllocateMap =
-                this.allocateStrategy.allocate(
-                        sourceSplitChangeResult.getIncreaseSet(), 
context.currentParallelism());
+        Map<Integer, Set<RocketMQSourceSplit>> newSourceSplitAllocateMap;
+
+        // Preliminary calculation of distribution results
+        {
+            // Allocate valid queues
+            if (sourceSplitChangeResult.decreaseSet != null
+                    && !sourceSplitChangeResult.decreaseSet.isEmpty()) {
+                partitionId = 0;
+
+                // Re-load balancing
+                Set<RocketMQSourceSplit> allMQ = new HashSet<>();
+                OffsetsSelector.MessageQueueOffsetsRetriever offsetsRetriever =
+                        new 
InnerConsumerImpl.RemotingOffsetsRetrieverImpl(consumer);
+                Map<MessageQueue, Long> stoppingOffsets =
+                        stoppingOffsetsSelector.getMessageQueueOffsets(
+                                allocatedSet.keySet(), offsetsRetriever);
+                Set<MessageQueue> delete =
+                        sourceSplitChangeResult.decreaseSet.stream()
+                                .map(RocketMQSourceSplit::getMessageQueue)
+                                .collect(Collectors.toSet());
+
+                // Calculate all queue
+                allMQ.addAll(sourceSplitChangeResult.increaseSet);
+                allocatedSet
+                        .keySet()
+                        .forEach(
+                                mq -> {
+                                    if (!delete.contains(mq)) {
+                                        allMQ.add(
+                                                new RocketMQSourceSplit(
+                                                        mq,
+                                                        checkedOffsets.get(mq),
+                                                        
stoppingOffsets.getOrDefault(
+                                                                mq,
+                                                                
RocketMQSourceSplit
+                                                                        
.NO_STOPPING_OFFSET)));
+                                    }
+                                });
+                newSourceSplitAllocateMap =
+                        this.allocateStrategy.allocate(
+                                allMQ, context.currentParallelism(), 
partitionId);
+
+                // Update cache
+                assignedMap.clear();
+                for (Map.Entry entry : newSourceSplitAllocateMap.entrySet()) {
+                    assignedMap.put(
+                            (Integer) entry.getKey(),
+                            ((Set<RocketMQSourceSplit>) entry.getValue())
+                                    .stream()
+                                            .map(RocketMQSourceSplit::clone)
+                                            .collect(Collectors.toSet()));
+                }
+                partitionId = allMQ.size();
+            } else {
+                newSourceSplitAllocateMap =
+                        this.allocateStrategy.allocate(
+                                sourceSplitChangeResult.getIncreaseSet(),
+                                context.currentParallelism(),
+                                partitionId);
+
+                // Update cache
+                newSourceSplitAllocateMap.forEach(
+                        (k, v) ->
+                                v.forEach(
+                                        mq ->
+                                                assignedMap
+                                                        .computeIfAbsent(k, r 
-> new HashSet<>())
+                                                        .add(mq)));
+                partitionId += sourceSplitChangeResult.getIncreaseSet().size();
+            }
+
+            // Allocate deleted queues
+            if (sourceSplitChangeResult.decreaseSet != null
+                    && !sourceSplitChangeResult.decreaseSet.isEmpty()) {
+                sourceSplitChangeResult.decreaseSet.forEach(
+                        mq -> {
+                            newSourceSplitAllocateMap
+                                    .computeIfAbsent(
+                                            
reflectedQueueToTaskId.get(mq.getMessageQueue()),
+                                            k -> new HashSet<>())
+                                    .add(mq);
+                            
reflectedQueueToTaskId.remove(mq.getMessageQueue());
+                        });
+            }
+        }
+
+        {
+            // Calculate the result after queue migration
+            if (sourceSplitChangeResult.decreaseSet != null
+                    && !sourceSplitChangeResult.decreaseSet.isEmpty()) {
+                Map<Integer, Set<RocketMQSourceSplit>> migrationQueue = new 
HashMap<>();
+                Map<Integer, Set<RocketMQSourceSplit>> noMigrationQueue = new 
HashMap<>();
+                for (Map.Entry entry : newSourceSplitAllocateMap.entrySet()) {
+                    int taskId = (int) entry.getKey();
+                    Set<RocketMQSourceSplit> splits = 
(Set<RocketMQSourceSplit>) entry.getValue();
+                    for (RocketMQSourceSplit split : splits) {
+                        if (!split.getIsIncrease()) {
+                            continue;
+                        }
+                        if (taskId != 
reflectedQueueToTaskId.get(split.getMessageQueue())) {
+                            migrationQueue
+                                    .computeIfAbsent(
+                                            
reflectedQueueToTaskId.get(split.getMessageQueue()),
+                                            k -> new HashSet<>())
+                                    .add(
+                                            new RocketMQSourceSplit(
+                                                    split.getMessageQueue(),
+                                                    split.getStartingOffset(),
+                                                    split.getStoppingOffset(),
+                                                    false));
+                        } else {
+                            noMigrationQueue
+                                    .computeIfAbsent(taskId, k -> new 
HashSet<>())
+                                    .add(split);
+                        }
+                    }
+                }
+
+                // finally result
+                migrationQueue.forEach(
+                        (taskId, splits) -> {
+                            
newSourceSplitAllocateMap.get(taskId).addAll(splits);
+                        });
+                noMigrationQueue.forEach(
+                        (taskId, splits) -> {
+                            
newSourceSplitAllocateMap.get(taskId).removeAll(splits);
+                        });
+            }
+        }
 
         for (Map.Entry<Integer, Set<RocketMQSourceSplit>> entry :
                 newSourceSplitAllocateMap.entrySet()) {
@@ -330,7 +558,12 @@ public class RocketMQSourceEnumerator
                         .computeIfAbsent(pendingReader, k -> new ArrayList<>())
                         .addAll(pendingAssignmentForReader);
                 pendingAssignmentForReader.forEach(
-                        split -> 
this.allocatedSet.add(split.getMessageQueue()));
+                        split -> {
+                            if (split.getIsIncrease()) {
+                                this.allocatedSet.put(split.getMessageQueue(), 
(byte) 1);
+                                
reflectedQueueToTaskId.put(split.getMessageQueue(), pendingReader);
+                            }
+                        });
             }
         }
 
@@ -352,6 +585,86 @@ public class RocketMQSourceEnumerator
         }
     }
 
+    private void handleInitAssignEvent(int taskId, SourceInitAssignEvent 
initAssignEvent) {
+        if (this.initTask == null || this.initTask[taskId]) {
+            return;
+        }
+        lock.lock();
+        try {
+            // sync assign result
+            if (initAssignEvent.getSplits() != null && 
!initAssignEvent.getSplits().isEmpty()) {
+                log.info(
+                        "Received SourceInitAssignEvent from reader {} with {} 
splits.",
+                        taskId,
+                        initAssignEvent.getSplits().toString());
+                initAssignEvent
+                        .getSplits()
+                        .forEach(
+                                split -> {
+                                    this.assignedMap
+                                            .computeIfAbsent(taskId, r -> new 
HashSet<>())
+                                            .add(split);
+                                    this.checkedOffsets.put(
+                                            split.getMessageQueue(), 
split.getStoppingOffset());
+                                    this.reflectedQueueToTaskId.put(
+                                            split.getMessageQueue(), taskId);
+                                    
this.allocatedSet.put(split.getMessageQueue(), (byte) 1);
+                                });
+            }
+            this.initTask[taskId] = true;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private void handleOffsetEvent(int taskId, SourceReportOffsetEvent 
sourceReportOffsetEvent) {
+        lock.lock();
+        try {
+            // Update offset of message queue
+            if (sourceReportOffsetEvent != null && 
sourceReportOffsetEvent.getCheckpoint() != -1) {
+                log.info(
+                        "Received SourceReportOffsetEvent from reader {} with 
offset {}",
+                        taskId,
+                        sourceReportOffsetEvent.getCheckpoint());
+                MessageQueue mq =
+                        new MessageQueue(
+                                sourceReportOffsetEvent.getTopic(),
+                                sourceReportOffsetEvent.getBroker(),
+                                sourceReportOffsetEvent.getQueueId());
+                this.checkedOffsets.put(mq, 
sourceReportOffsetEvent.getCheckpoint());
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private void notifyAssignResult() {
+        if (assignedMap.isEmpty()) {
+            return;
+        }
+        lock.lock();
+        try {
+            for (Map.Entry<Integer, Set<RocketMQSourceSplit>> entry : 
assignedMap.entrySet()) {
+                SourceCheckEvent sourceCheckEvent = new SourceCheckEvent();
+                Map<MessageQueue, Tuple2<Long, Long>> assignedMq = new 
HashMap<>();
+
+                entry.getValue()
+                        .forEach(
+                                split -> {
+                                    assignedMq.put(
+                                            split.getMessageQueue(),
+                                            new Tuple2<>(
+                                                    split.getStartingOffset(),
+                                                    
split.getStoppingOffset()));
+                                });
+                sourceCheckEvent.setAssignedMq(assignedMq);
+                context.sendEventToSourceReader(entry.getKey(), 
sourceCheckEvent);
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
     /** A container class to hold the newly added partitions and removed 
partitions. */
     @VisibleForTesting
     private static class SourceChangeResult {
@@ -380,7 +693,7 @@ public class RocketMQSourceEnumerator
     public static class SourceSplitChangeResult {
 
         private final Set<RocketMQSourceSplit> increaseSet;
-        private final Set<MessageQueue> decreaseSet;
+        private final Set<RocketMQSourceSplit> decreaseSet;
 
         private SourceSplitChangeResult(Set<RocketMQSourceSplit> increaseSet) {
             this.increaseSet = Collections.unmodifiableSet(increaseSet);
@@ -388,7 +701,7 @@ public class RocketMQSourceEnumerator
         }
 
         private SourceSplitChangeResult(
-                Set<RocketMQSourceSplit> increaseSet, Set<MessageQueue> 
decreaseSet) {
+                Set<RocketMQSourceSplit> increaseSet, Set<RocketMQSourceSplit> 
decreaseSet) {
             this.increaseSet = Collections.unmodifiableSet(increaseSet);
             this.decreaseSet = Collections.unmodifiableSet(decreaseSet);
         }
@@ -397,14 +710,14 @@ public class RocketMQSourceEnumerator
             return increaseSet;
         }
 
-        public Set<MessageQueue> getDecreaseSet() {
+        public Set<RocketMQSourceSplit> getDecreaseSet() {
             return decreaseSet;
         }
     }
 
     @VisibleForTesting
     private SourceChangeResult getSourceChangeResult(Set<MessageQueue> 
latestSet) {
-        Set<MessageQueue> currentSet = 
Collections.unmodifiableSet(this.allocatedSet);
+        Set<MessageQueue> currentSet = 
Collections.unmodifiableSet(this.allocatedSet.keySet());
         Set<MessageQueue> increaseSet = Sets.difference(latestSet, currentSet);
         Set<MessageQueue> decreaseSet = Sets.difference(currentSet, latestSet);
 
diff --git 
a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AllocateStrategy.java
 
b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AllocateStrategy.java
index bfd814e..6386b22 100644
--- 
a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AllocateStrategy.java
+++ 
b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AllocateStrategy.java
@@ -44,4 +44,17 @@ public interface AllocateStrategy {
      */
     Map<Integer, Set<RocketMQSourceSplit>> allocate(
             final Collection<RocketMQSourceSplit> mqAll, final int 
parallelism);
+
+    /**
+     * Allocates RocketMQ source splits to Flink tasks based on the selected 
allocation strategy.
+     *
+     * @param mqAll a collection of all available RocketMQ source splits
+     * @param parallelism the desired parallelism for the Flink tasks
+     * @param globalAssignedNumber number of allocated queues
+     * @return a map of task indices to sets of corresponding RocketMQ source 
splits
+     */
+    Map<Integer, Set<RocketMQSourceSplit>> allocate(
+            final Collection<RocketMQSourceSplit> mqAll,
+            final int parallelism,
+            int globalAssignedNumber);
 }
diff --git 
a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AllocateStrategyFactory.java
 
b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AllocateStrategyFactory.java
index 6c9d723..442afb9 100644
--- 
a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AllocateStrategyFactory.java
+++ 
b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AllocateStrategyFactory.java
@@ -27,6 +27,7 @@ public class AllocateStrategyFactory {
 
     public static final String STRATEGY_NAME_BROADCAST = "broadcast";
     public static final String STRATEGY_NAME_CONSISTENT_HASH = "hash";
+    public static final String STRATEGY_NAME_AVERAGE = "average";
 
     private AllocateStrategyFactory() {
         // No public constructor.
@@ -46,6 +47,8 @@ public class AllocateStrategyFactory {
                 return new ConsistentHashAllocateStrategy();
             case STRATEGY_NAME_BROADCAST:
                 return new BroadcastAllocateStrategy();
+            case STRATEGY_NAME_AVERAGE:
+                return new AverageAllocateStrategy();
             default:
                 throw new IllegalArgumentException(
                         "We don't support this allocate strategy: " + 
allocateStrategyName);
diff --git 
a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AverageAllocateStrategy.java
 
b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AverageAllocateStrategy.java
new file mode 100644
index 0000000..f808133
--- /dev/null
+++ 
b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AverageAllocateStrategy.java
@@ -0,0 +1,43 @@
+package org.apache.flink.connector.rocketmq.source.enumerator.allocate;
+
+import org.apache.flink.connector.rocketmq.source.split.RocketMQSourceSplit;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class AverageAllocateStrategy implements AllocateStrategy {
+    @Override
+    public String getStrategyName() {
+        return AllocateStrategyFactory.STRATEGY_NAME_AVERAGE;
+    }
+
+    @Override
+    public Map<Integer, Set<RocketMQSourceSplit>> allocate(
+            Collection<RocketMQSourceSplit> mqAll, int parallelism) {
+        return null;
+    }
+
+    @Override
+    public Map<Integer, Set<RocketMQSourceSplit>> allocate(
+            Collection<RocketMQSourceSplit> mqAll, int parallelism, int 
globalAssignedNumber) {
+        Map<Integer, Set<RocketMQSourceSplit>> result = new HashMap<>();
+        for (RocketMQSourceSplit mq : mqAll) {
+            int readerIndex =
+                    this.getSplitOwner(mq.getTopic(), globalAssignedNumber++, 
parallelism);
+            result.computeIfAbsent(readerIndex, k -> new HashSet<>()).add(mq);
+        }
+        return result;
+    }
+
+    private int getSplitOwner(String topic, int partition, int numReaders) {
+        int startIndex = ((topic.hashCode() * 31) & 0x7FFFFFFF) % numReaders;
+
+        // here, the assumption is that the id of RocketMQ partitions are 
always ascending
+        // starting from 0, and therefore can be used directly as the offset 
clockwise from the
+        // start index
+        return (startIndex + partition) % numReaders;
+    }
+}
diff --git 
a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/BroadcastAllocateStrategy.java
 
b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/BroadcastAllocateStrategy.java
index 2e46419..f37cf04 100644
--- 
a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/BroadcastAllocateStrategy.java
+++ 
b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/BroadcastAllocateStrategy.java
@@ -40,4 +40,10 @@ public class BroadcastAllocateStrategy implements 
AllocateStrategy {
         }
         return result;
     }
+
+    @Override
+    public Map<Integer, Set<RocketMQSourceSplit>> allocate(
+            Collection<RocketMQSourceSplit> mqAll, int parallelism, int 
globalAssignedNumber) {
+        return allocate(mqAll, parallelism);
+    }
 }
diff --git 
a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/ConsistentHashAllocateStrategy.java
 
b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/ConsistentHashAllocateStrategy.java
index 6a3ad2c..7cd10c4 100644
--- 
a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/ConsistentHashAllocateStrategy.java
+++ 
b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/ConsistentHashAllocateStrategy.java
@@ -48,4 +48,10 @@ public class ConsistentHashAllocateStrategy implements 
AllocateStrategy {
         }
         return result;
     }
+
+    @Override
+    public Map<Integer, Set<RocketMQSourceSplit>> allocate(
+            Collection<RocketMQSourceSplit> mqAll, int parallelism, int 
globalAssignedNumber) {
+        return allocate(mqAll, parallelism);
+    }
 }
diff --git 
a/src/main/java/org/apache/flink/connector/rocketmq/source/metrics/RocketMQSourceReaderMetrics.java
 
b/src/main/java/org/apache/flink/connector/rocketmq/source/metrics/RocketMQSourceReaderMetrics.java
index 09c1049..57c8a63 100644
--- 
a/src/main/java/org/apache/flink/connector/rocketmq/source/metrics/RocketMQSourceReaderMetrics.java
+++ 
b/src/main/java/org/apache/flink/connector/rocketmq/source/metrics/RocketMQSourceReaderMetrics.java
@@ -45,4 +45,6 @@ public class RocketMQSourceReaderMetrics {
     public RocketMQSourceReaderMetrics(SourceReaderMetricGroup 
sourceReaderMetricGroup) {}
 
     public void registerNewMessageQueue(MessageQueue messageQueue) {}
+
+    public void unregisterMessageQueue(MessageQueue messageQueue) {}
 }
diff --git 
a/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSourceFetcherManager.java
 
b/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSourceFetcherManager.java
index a635b17..242e19e 100644
--- 
a/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSourceFetcherManager.java
+++ 
b/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSourceFetcherManager.java
@@ -95,4 +95,8 @@ public class RocketMQSourceFetcherManager
                     public void wakeUp() {}
                 });
     }
+
+    public RocketMQSplitReader getSplitReader() {
+        return (RocketMQSplitReader) fetchers.get(0).getSplitReader();
+    }
 }
diff --git 
a/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSourceReader.java
 
b/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSourceReader.java
index 9303f35..042c12a 100644
--- 
a/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSourceReader.java
+++ 
b/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSourceReader.java
@@ -19,30 +19,43 @@
 package org.apache.flink.connector.rocketmq.source.reader;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.RecordEmitter;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import 
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import 
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
 import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.rocketmq.common.event.SourceCheckEvent;
+import org.apache.flink.connector.rocketmq.common.event.SourceDetectEvent;
+import org.apache.flink.connector.rocketmq.common.event.SourceInitAssignEvent;
+import 
org.apache.flink.connector.rocketmq.common.event.SourceReportOffsetEvent;
 import org.apache.flink.connector.rocketmq.source.RocketMQSourceOptions;
 import 
org.apache.flink.connector.rocketmq.source.metrics.RocketMQSourceReaderMetrics;
 import org.apache.flink.connector.rocketmq.source.split.RocketMQSourceSplit;
 import 
org.apache.flink.connector.rocketmq.source.split.RocketMQSourceSplitState;
 import org.apache.flink.connector.rocketmq.source.util.UtilAll;
 
+import com.google.common.collect.Sets;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 /** The source reader for RocketMQ partitions. */
 public class RocketMQSourceReader<T>
@@ -57,6 +70,7 @@ public class RocketMQSourceReader<T>
     private final SortedMap<Long, Map<MessageQueue, Long>> offsetsToCommit;
     private final ConcurrentMap<MessageQueue, Long> offsetsOfFinishedSplits;
     private final RocketMQSourceReaderMetrics rocketmqSourceReaderMetrics;
+    private final RocketMQSplitReader reader;
 
     public RocketMQSourceReader(
             FutureCompletingBlockingQueue<RecordsWithSplitIds<MessageView>> 
elementsQueue,
@@ -64,7 +78,8 @@ public class RocketMQSourceReader<T>
             RecordEmitter<MessageView, T, RocketMQSourceSplitState> 
recordEmitter,
             Configuration config,
             SourceReaderContext context,
-            RocketMQSourceReaderMetrics rocketMQSourceReaderMetrics) {
+            RocketMQSourceReaderMetrics rocketMQSourceReaderMetrics,
+            Supplier<SplitReader<MessageView, RocketMQSourceSplit>> 
readerSupplier) {
 
         super(elementsQueue, rocketmqSourceFetcherManager, recordEmitter, 
config, context);
         this.offsetsToCommit = Collections.synchronizedSortedMap(new 
TreeMap<>());
@@ -72,6 +87,7 @@ public class RocketMQSourceReader<T>
         this.commitOffsetsOnCheckpoint =
                 config.get(RocketMQSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT);
         this.rocketmqSourceReaderMetrics = rocketMQSourceReaderMetrics;
+        this.reader = (RocketMQSplitReader) readerSupplier.get();
     }
 
     @Override
@@ -136,10 +152,91 @@ public class RocketMQSourceReader<T>
 
     @Override
     protected RocketMQSourceSplit toSplitType(String splitId, 
RocketMQSourceSplitState splitState) {
+        // Report checkpoint progress.
+        SourceReportOffsetEvent sourceEvent = new SourceReportOffsetEvent();
+        sourceEvent.setBroker(splitState.getBrokerName());
+        sourceEvent.setTopic(splitState.getTopic());
+        sourceEvent.setQueueId(splitState.getQueueId());
+        sourceEvent.setCheckpoint(splitState.getCurrentOffset());
+        context.sendSourceEventToCoordinator(sourceEvent);
+        LOG.info("Report checkpoint progress: {}", sourceEvent);
         return splitState.getSourceSplit();
     }
 
+    @Override
+    public void handleSourceEvents(SourceEvent sourceEvent) {
+        if (sourceEvent instanceof SourceDetectEvent) {
+            handleSourceDetectEvent();
+        } else if (sourceEvent instanceof SourceCheckEvent) {
+            handleSourceCheckEvent((SourceCheckEvent) sourceEvent);
+        }
+    }
+
     // ------------------------
+    private void handleSourceDetectEvent() {
+        SourceInitAssignEvent sourceEvent1 = new SourceInitAssignEvent();
+        List<RocketMQSourceSplit> splits = new LinkedList<>();
+        ConcurrentMap<MessageQueue, Tuple2<Long, Long>> currentOffsetTable =
+                reader.getCurrentOffsetTable();
+
+        if (!currentOffsetTable.isEmpty()) {
+            for (Map.Entry<MessageQueue, Tuple2<Long, Long>> entry :
+                    currentOffsetTable.entrySet()) {
+                MessageQueue messageQueue = entry.getKey();
+                Long startOffset = entry.getValue().f0;
+                Long stopOffset = entry.getValue().f1;
+                RocketMQSourceSplit split =
+                        new RocketMQSourceSplit(messageQueue, startOffset, 
stopOffset);
+                splits.add(split);
+            }
+        }
+        sourceEvent1.setSplits(splits);
+        context.sendSourceEventToCoordinator(sourceEvent1);
+        reader.setInitFinish(true);
+    }
+
+    private void handleSourceCheckEvent(SourceCheckEvent sourceEvent) {
+        Map<MessageQueue, Tuple2<Long, Long>> checkMap = 
sourceEvent.getAssignedMq();
+        Set<MessageQueue> assignedMq = checkMap.keySet();
+        Set<MessageQueue> currentMq = reader.getCurrentOffsetTable().keySet();
+        Set<MessageQueue> increaseSet = Sets.difference(assignedMq, currentMq);
+        Set<MessageQueue> decreaseSet = Sets.difference(currentMq, assignedMq);
+
+        if (increaseSet.isEmpty() && decreaseSet.isEmpty()) {
+            LOG.info("No need to checkpoint, current assigned mq is same as 
before.");
+        }
+
+        if (!increaseSet.isEmpty()) {
+            SplitsAddition<RocketMQSourceSplit> increase;
+            increase =
+                    new SplitsAddition<>(
+                            increaseSet.stream()
+                                    .map(
+                                            mq ->
+                                                    new RocketMQSourceSplit(
+                                                            mq,
+                                                            
checkMap.get(mq).f0,
+                                                            
checkMap.get(mq).f1,
+                                                            true))
+                                    .collect(Collectors.toList()));
+            reader.handleSplitsChanges(increase);
+        }
+        if (!decreaseSet.isEmpty()) {
+            SplitsAddition<RocketMQSourceSplit> decrease;
+            decrease =
+                    new SplitsAddition<>(
+                            decreaseSet.stream()
+                                    .map(
+                                            mq ->
+                                                    new RocketMQSourceSplit(
+                                                            mq,
+                                                            
checkMap.get(mq).f0,
+                                                            
checkMap.get(mq).f1,
+                                                            false))
+                                    .collect(Collectors.toList()));
+            reader.handleSplitsChanges(decrease);
+        }
+    }
 
     @VisibleForTesting
     SortedMap<Long, Map<MessageQueue, Long>> getOffsetsToCommit() {
diff --git 
a/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSplitReader.java
 
b/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSplitReader.java
index 3a07966..9a0833c 100644
--- 
a/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSplitReader.java
+++ 
b/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSplitReader.java
@@ -27,6 +27,8 @@ import 
org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import 
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
 import org.apache.flink.connector.rocketmq.common.config.RocketMQOptions;
+import org.apache.flink.connector.rocketmq.common.event.SourceInitAssignEvent;
+import org.apache.flink.connector.rocketmq.common.lock.SpinLock;
 import org.apache.flink.connector.rocketmq.source.InnerConsumer;
 import org.apache.flink.connector.rocketmq.source.InnerConsumerImpl;
 import org.apache.flink.connector.rocketmq.source.RocketMQSourceOptions;
@@ -36,11 +38,13 @@ import 
org.apache.flink.connector.rocketmq.source.split.RocketMQSourceSplit;
 import org.apache.flink.connector.rocketmq.source.util.UtilAll;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
+
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -81,6 +85,11 @@ public class RocketMQSplitReader<T> implements 
SplitReader<MessageView, RocketMQ
     private final ConcurrentMap<MessageQueue, Tuple2<Long, Long>> 
currentOffsetTable;
     private final RocketMQSourceReaderMetrics rocketmqSourceReaderMetrics;
 
+    // Init status : true-finish init; false-not finish init
+    private boolean initFinish;
+    private final RocketMQRecordsWithSplitIds<MessageView> recordsWithSplitIds;
+    private final SpinLock lock;
+
     public RocketMQSplitReader(
             Configuration configuration,
             SourceReaderContext sourceReaderContext,
@@ -92,6 +101,9 @@ public class RocketMQSplitReader<T> implements 
SplitReader<MessageView, RocketMQ
         this.deserializationSchema = deserializationSchema;
         this.offsetsToCommit = new TreeMap<>();
         this.currentOffsetTable = new ConcurrentHashMap<>();
+        this.recordsWithSplitIds = new 
RocketMQRecordsWithSplitIds<>(rocketmqSourceReaderMetrics);
+        this.initFinish = false;
+        this.lock = new SpinLock();
 
         this.consumer = new InnerConsumerImpl(configuration);
         this.consumer.start();
@@ -103,9 +115,17 @@ public class RocketMQSplitReader<T> implements 
SplitReader<MessageView, RocketMQ
 
     @Override
     public RecordsWithSplitIds<MessageView> fetch() throws IOException {
+        lock.lock();
         wakeup = false;
         RocketMQRecordsWithSplitIds<MessageView> recordsWithSplitIds =
                 new RocketMQRecordsWithSplitIds<>(rocketmqSourceReaderMetrics);
+        try {
+            this.recordsWithSplitIds.finishedSplits.forEach(
+                    splitId -> recordsWithSplitIds.addFinishedSplit(splitId));
+            this.recordsWithSplitIds.finishedSplits.clear();
+        } finally {
+            lock.unlock();
+        }
         try {
             Duration duration =
                     
Duration.ofMillis(this.configuration.getLong(RocketMQOptions.POLL_TIMEOUT));
@@ -129,6 +149,7 @@ public class RocketMQSplitReader<T> implements 
SplitReader<MessageView, RocketMQ
         } catch (Exception e) {
             LOG.error("Reader fetch split error", e);
         }
+
         return recordsWithSplitIds;
     }
 
@@ -142,38 +163,53 @@ public class RocketMQSplitReader<T> implements 
SplitReader<MessageView, RocketMQ
                             splitsChange.getClass()));
         }
 
+        if (!initFinish) {
+            LOG.info("Start to init reader");
+            SourceInitAssignEvent sourceEvent = new SourceInitAssignEvent();
+            sourceEvent.setSplits(splitsChange.splits());
+            sourceReaderContext.sendSourceEventToCoordinator(sourceEvent);
+            initFinish = true;
+        }
+        lock.lock();
+
+        LOG.info("Receive split change: " + splitsChange.splits().toString());
         // Assignment.
         ConcurrentMap<MessageQueue, Tuple2<Long, Long>> newOffsetTable = new 
ConcurrentHashMap<>();
 
-        // Set up the stopping timestamps.
-        splitsChange
-                .splits()
-                .forEach(
-                        split -> {
-                            MessageQueue messageQueue =
-                                    new MessageQueue(
-                                            split.getTopic(),
-                                            split.getBrokerName(),
-                                            split.getQueueId());
-                            newOffsetTable.put(
-                                    messageQueue,
-                                    new Tuple2<>(
-                                            split.getStartingOffset(), 
split.getStoppingOffset()));
-                            
rocketmqSourceReaderMetrics.registerNewMessageQueue(messageQueue);
-                        });
-
-        // todo: log message queue change
-
+        try {
+            // Set up the stopping timestamps.
+            splitsChange
+                    .splits()
+                    .forEach(
+                            split -> {
+                                if (!split.getIsIncrease()) {
+                                    finishSplitAtRecord(
+                                            split.getMessageQueue(),
+                                            split.getStoppingOffset(),
+                                            recordsWithSplitIds);
+                                } else {
+                                    if 
(!currentOffsetTable.containsKey(split.getMessageQueue())) {
+                                        registerSplits(split);
+                                        newOffsetTable.put(
+                                                split.getMessageQueue(),
+                                                new Tuple2<>(
+                                                        
split.getStartingOffset(),
+                                                        
split.getStoppingOffset()));
+                                    }
+                                }
+                            });
+        } finally {
+            lock.unlock();
+        }
         // It will replace the previous assignment
-        Set<MessageQueue> incrementalSplits = newOffsetTable.keySet();
-        consumer.assign(incrementalSplits);
+        consumer.assign(currentOffsetTable.keySet());
 
         // set offset to consumer
         for (Map.Entry<MessageQueue, Tuple2<Long, Long>> entry : 
newOffsetTable.entrySet()) {
             MessageQueue messageQueue = entry.getKey();
             Long startingOffset = entry.getValue().f0;
             try {
-                consumer.seek(messageQueue, startingOffset);
+                consumer.seek(messageQueue, startingOffset == -1L ? 0L : 
startingOffset);
             } catch (Exception e) {
                 String info =
                         String.format(
@@ -207,14 +243,31 @@ public class RocketMQSplitReader<T> implements 
SplitReader<MessageView, RocketMQ
         }
     }
 
+    public ConcurrentMap<MessageQueue, Tuple2<Long, Long>> 
getCurrentOffsetTable() {
+        return currentOffsetTable;
+    }
+
+    private void registerSplits(RocketMQSourceSplit split) {
+        LOG.info("Register split {}", split.splitId());
+        this.currentOffsetTable.put(
+                split.getMessageQueue(),
+                new Tuple2<>(split.getStartingOffset(), 
split.getStoppingOffset()));
+        
this.rocketmqSourceReaderMetrics.registerNewMessageQueue(split.getMessageQueue());
+    }
+
+    public void setInitFinish(boolean initFinish) {
+        this.initFinish = initFinish;
+    }
+
     private void finishSplitAtRecord(
             MessageQueue messageQueue,
             long currentOffset,
             RocketMQRecordsWithSplitIds<MessageView> recordsBySplits) {
-
         LOG.info("message queue {} has reached stopping offset {}", 
messageQueue, currentOffset);
-        // recordsBySplits.addFinishedSplit(getSplitId(messageQueue));
+
         this.currentOffsetTable.remove(messageQueue);
+        this.rocketmqSourceReaderMetrics.unregisterMessageQueue(messageQueue);
+        
recordsBySplits.addFinishedSplit(RocketMQSourceSplit.toSplitId(messageQueue));
     }
 
     // ---------------- private helper class ------------------------
diff --git 
a/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializer.java
 
b/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializer.java
index 36c7a0f..eb37e19 100644
--- 
a/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializer.java
+++ 
b/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializer.java
@@ -47,6 +47,7 @@ public class RocketMQPartitionSplitSerializer
             out.writeInt(split.getQueueId());
             out.writeLong(split.getStartingOffset());
             out.writeLong(split.getStoppingOffset());
+            out.writeBoolean(split.getIsIncrease());
             out.flush();
             return byteArrayOutputStream.toByteArray();
         }
@@ -61,8 +62,13 @@ public class RocketMQPartitionSplitSerializer
             int partition = in.readInt();
             long startingOffset = in.readLong();
             long stoppingOffset = in.readLong();
+            if (version == SNAPSHOT_VERSION) {
+                return new RocketMQSourceSplit(
+                        topic, broker, partition, startingOffset, 
stoppingOffset);
+            }
+            boolean isIncrease = in.readBoolean();
             return new RocketMQSourceSplit(
-                    topic, broker, partition, startingOffset, stoppingOffset);
+                    topic, broker, partition, startingOffset, stoppingOffset, 
isIncrease);
         }
     }
 }
diff --git 
a/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQSourceSplit.java
 
b/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQSourceSplit.java
index 7124086..a94044d 100644
--- 
a/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQSourceSplit.java
+++ 
b/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQSourceSplit.java
@@ -37,6 +37,8 @@ public class RocketMQSourceSplit implements SourceSplit {
     private final int queueId;
     private final long startingOffset;
     private final long stoppingOffset;
+    // whether the split is increase or decrease: true-increase, false-decrease
+    private final boolean isIncrease;
 
     public RocketMQSourceSplit(
             MessageQueue messageQueue, long startingOffset, long 
stoppingOffset) {
@@ -48,6 +50,20 @@ public class RocketMQSourceSplit implements SourceSplit {
                 stoppingOffset);
     }
 
+    public RocketMQSourceSplit(
+            MessageQueue messageQueue,
+            long startingOffset,
+            long stoppingOffset,
+            boolean isIncrease) {
+        this(
+                messageQueue.getTopic(),
+                messageQueue.getBrokerName(),
+                messageQueue.getQueueId(),
+                startingOffset,
+                stoppingOffset,
+                isIncrease);
+    }
+
     public RocketMQSourceSplit(
             String topic,
             String brokerName,
@@ -59,6 +75,22 @@ public class RocketMQSourceSplit implements SourceSplit {
         this.queueId = queueId;
         this.startingOffset = startingOffset;
         this.stoppingOffset = stoppingOffset;
+        this.isIncrease = true;
+    }
+
+    public RocketMQSourceSplit(
+            String topic,
+            String brokerName,
+            int queueId,
+            long startingOffset,
+            long stoppingOffset,
+            boolean isIncrease) {
+        this.topic = topic;
+        this.brokerName = brokerName;
+        this.queueId = queueId;
+        this.startingOffset = startingOffset;
+        this.stoppingOffset = stoppingOffset;
+        this.isIncrease = isIncrease;
     }
 
     public String getTopic() {
@@ -85,6 +117,28 @@ public class RocketMQSourceSplit implements SourceSplit {
         return new MessageQueue(topic, brokerName, queueId);
     }
 
+    public boolean getIsIncrease() {
+        return isIncrease;
+    }
+
+    public static String toSplitId(MessageQueue messageQueue) {
+        return messageQueue.getTopic()
+                + SEPARATOR
+                + messageQueue.getBrokerName()
+                + SEPARATOR
+                + messageQueue.getQueueId();
+    }
+
+    public static RocketMQSourceSplit clone(RocketMQSourceSplit split) {
+        return new RocketMQSourceSplit(
+                split.topic,
+                split.brokerName,
+                split.queueId,
+                split.startingOffset,
+                split.stoppingOffset,
+                split.isIncrease);
+    }
+
     @Override
     public String splitId() {
         return topic + SEPARATOR + brokerName + SEPARATOR + queueId;
@@ -93,13 +147,13 @@ public class RocketMQSourceSplit implements SourceSplit {
     @Override
     public String toString() {
         return String.format(
-                "(Topic: %s, BrokerName: %s, QueueId: %d, MinOffset: %d, 
MaxOffset: %d)",
-                topic, brokerName, queueId, startingOffset, stoppingOffset);
+                "(Topic: %s, BrokerName: %s, QueueId: %d, MinOffset: %d, 
MaxOffset: %d, status: %s)",
+                topic, brokerName, queueId, startingOffset, stoppingOffset, 
isIncrease);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(topic, brokerName, queueId, startingOffset, 
stoppingOffset);
+        return Objects.hash(topic, brokerName, queueId, startingOffset, 
stoppingOffset, isIncrease);
     }
 
     @Override
@@ -112,6 +166,7 @@ public class RocketMQSourceSplit implements SourceSplit {
                 && brokerName.equals(other.brokerName)
                 && queueId == other.queueId
                 && startingOffset == other.startingOffset
-                && stoppingOffset == other.stoppingOffset;
+                && stoppingOffset == other.stoppingOffset
+                && isIncrease == other.isIncrease;
     }
 }
diff --git 
a/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQSourceSplitState.java
 
b/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQSourceSplitState.java
index ca74d58..5f16a80 100644
--- 
a/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQSourceSplitState.java
+++ 
b/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQSourceSplitState.java
@@ -48,6 +48,11 @@ public class RocketMQSourceSplitState extends 
RocketMQSourceSplit {
      */
     public RocketMQSourceSplit getSourceSplit() {
         return new RocketMQSourceSplit(
-                getTopic(), getBrokerName(), getQueueId(), getCurrentOffset(), 
getStoppingOffset());
+                getTopic(),
+                getBrokerName(),
+                getQueueId(),
+                getCurrentOffset(),
+                getStoppingOffset(),
+                getIsIncrease());
     }
 }
diff --git 
a/src/test/java/org/apache/flink/connector/rocketmq/legacy/sourceFunction/RocketMQSourceFunctionTest.java
 
b/src/test/java/org/apache/flink/connector/rocketmq/legacy/sourceFunction/RocketMQSourceFunctionTest.java
index 08371b3..78ffd17 100644
--- 
a/src/test/java/org/apache/flink/connector/rocketmq/legacy/sourceFunction/RocketMQSourceFunctionTest.java
+++ 
b/src/test/java/org/apache/flink/connector/rocketmq/legacy/sourceFunction/RocketMQSourceFunctionTest.java
@@ -26,7 +26,6 @@ import 
org.apache.flink.connector.rocketmq.legacy.common.serialization.SimpleStr
 import org.apache.flink.connector.rocketmq.legacy.common.util.TestUtils;
 
 import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
-import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.junit.Assert;
 import org.junit.Test;
diff --git 
a/src/test/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AverageAllocateStrategyTest.java
 
b/src/test/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AverageAllocateStrategyTest.java
new file mode 100644
index 0000000..f63c8d1
--- /dev/null
+++ 
b/src/test/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AverageAllocateStrategyTest.java
@@ -0,0 +1,97 @@
+package org.apache.flink.connector.rocketmq.source.enumerator.allocate;
+
+import org.apache.flink.connector.rocketmq.source.split.RocketMQSourceSplit;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+public class AverageAllocateStrategyTest {
+
+    private static final String BROKER_NAME = "brokerName";
+    private static final String PREFIX_TOPIC = "test-topic-";
+    private static final int NUM_SPLITS = 900;
+    private static final int[] SPLIT_SIZE = {1000, 2000, 3000, 4000, 5000, 
6000, 7000, 8000, 9000};
+
+    @Test
+    public void averageAllocateStrategyTest() {
+        AllocateStrategy allocateStrategy = new AverageAllocateStrategy();
+        Collection<RocketMQSourceSplit> mqAll = new ArrayList<>();
+        for (int i = 0; i < NUM_SPLITS; i++) {
+            mqAll.add(
+                    new RocketMQSourceSplit(
+                            PREFIX_TOPIC + (i + 1),
+                            BROKER_NAME,
+                            i,
+                            0,
+                            SPLIT_SIZE[i % SPLIT_SIZE.length]));
+        }
+        int parallelism = 3;
+        Map<Integer, Set<RocketMQSourceSplit>> result =
+                allocateStrategy.allocate(mqAll, parallelism, 0);
+        assertEquals(NUM_SPLITS / parallelism, result.get(0).size());
+        assertEquals(NUM_SPLITS / parallelism, result.get(1).size());
+        assertEquals(NUM_SPLITS / parallelism, result.get(2).size());
+    }
+
+    @Test
+    public void averagesAllocateStrategyTest() {
+        AllocateStrategy allocateStrategy = new AverageAllocateStrategy();
+        Collection<RocketMQSourceSplit> mqAll = new ArrayList<>();
+        for (int i = 0; i < NUM_SPLITS; i++) {
+            mqAll.add(
+                    new RocketMQSourceSplit(
+                            PREFIX_TOPIC + (i + 1),
+                            BROKER_NAME,
+                            i,
+                            0,
+                            SPLIT_SIZE[i % SPLIT_SIZE.length]));
+        }
+        int parallelism = 3;
+        Map<Integer, Set<RocketMQSourceSplit>> result =
+                allocateStrategy.allocate(mqAll, parallelism, 0);
+        assertEquals(NUM_SPLITS / parallelism, result.get(0).size());
+        assertEquals(NUM_SPLITS / parallelism, result.get(1).size());
+        assertEquals(NUM_SPLITS / parallelism, result.get(2).size());
+
+        mqAll.clear();
+        for (int i = NUM_SPLITS; i < 8 + NUM_SPLITS; i++) {
+            mqAll.add(
+                    new RocketMQSourceSplit(
+                            PREFIX_TOPIC + (i + 1),
+                            BROKER_NAME,
+                            i,
+                            0,
+                            SPLIT_SIZE[i % SPLIT_SIZE.length]));
+        }
+        Map<Integer, Set<RocketMQSourceSplit>> result1 =
+                allocateStrategy.allocate(mqAll, parallelism, NUM_SPLITS);
+
+        mqAll.clear();
+        for (int i = 8 + NUM_SPLITS; i < 8 + 7 + NUM_SPLITS; i++) {
+            mqAll.add(
+                    new RocketMQSourceSplit(
+                            PREFIX_TOPIC + (i + 1),
+                            BROKER_NAME,
+                            i,
+                            0,
+                            SPLIT_SIZE[i % SPLIT_SIZE.length]));
+        }
+        Map<Integer, Set<RocketMQSourceSplit>> result2 =
+                allocateStrategy.allocate(mqAll, parallelism, NUM_SPLITS + 8);
+
+        result1.forEach((k, v) -> result.computeIfAbsent(k, r -> new 
HashSet<>()).addAll(v));
+        result2.forEach((k, v) -> result.computeIfAbsent(k, r -> new 
HashSet<>()).addAll(v));
+
+        // No matter how many times it's assigned, it's always equal
+        assertEquals((NUM_SPLITS + 8 + 7) / parallelism, result.get(0).size());
+        assertEquals((NUM_SPLITS + 8 + 7) / parallelism, result.get(1).size());
+        assertEquals((NUM_SPLITS + 8 + 7) / parallelism, result.get(2).size());
+    }
+}
diff --git 
a/src/test/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializerTest.java
 
b/src/test/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializerTest.java
index f1fa5af..432994e 100644
--- 
a/src/test/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializerTest.java
+++ 
b/src/test/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializerTest.java
@@ -36,4 +36,56 @@ public class RocketMQPartitionSplitSerializerTest {
                 serializer.deserialize(serializer.getVersion(), 
serializer.serialize(expected));
         assertEquals(expected, actual);
     }
+
+    @Test
+    public void testSerializeAndDeserialize() throws IOException {
+        RocketMQPartitionSplitSerializer serializer = new 
RocketMQPartitionSplitSerializer();
+        RocketMQSourceSplit originalSplit =
+                new RocketMQSourceSplit("testTopic", "testBroker", 0, 100L, 
200L, false);
+
+        byte[] serialized = serializer.serialize(originalSplit);
+        RocketMQSourceSplit deserializedSplit =
+                serializer.deserialize(serializer.getVersion(), serialized);
+
+        assertEquals(originalSplit.getTopic(), deserializedSplit.getTopic());
+        assertEquals(originalSplit.getBrokerName(), 
deserializedSplit.getBrokerName());
+        assertEquals(originalSplit.getQueueId(), 
deserializedSplit.getQueueId());
+        assertEquals(originalSplit.getStartingOffset(), 
deserializedSplit.getStartingOffset());
+        assertEquals(originalSplit.getStoppingOffset(), 
deserializedSplit.getStoppingOffset());
+        assertEquals(originalSplit.getIsIncrease(), 
deserializedSplit.getIsIncrease());
+    }
+
+    @Test
+    public void testDeserializeWithOldVersion() throws IOException {
+        RocketMQPartitionSplitSerializer serializer = new 
RocketMQPartitionSplitSerializer();
+        RocketMQSourceSplit originalSplit =
+                new RocketMQSourceSplit("testTopic", "testBroker", 0, 100L, 
200L, false);
+
+        byte[] serialized = serializer.serialize(originalSplit);
+        RocketMQSourceSplit deserializedSplit = serializer.deserialize(1, 
serialized);
+
+        assertEquals(originalSplit.getTopic(), deserializedSplit.getTopic());
+        assertEquals(originalSplit.getBrokerName(), 
deserializedSplit.getBrokerName());
+        assertEquals(originalSplit.getQueueId(), 
deserializedSplit.getQueueId());
+        assertEquals(originalSplit.getStartingOffset(), 
deserializedSplit.getStartingOffset());
+        assertEquals(originalSplit.getStoppingOffset(), 
deserializedSplit.getStoppingOffset());
+        assertEquals(originalSplit.getIsIncrease(), 
deserializedSplit.getIsIncrease());
+    }
+
+    @Test
+    public void testDeserializeWithOldVersion1() throws IOException {
+        RocketMQPartitionSplitSerializer serializer = new 
RocketMQPartitionSplitSerializer();
+        RocketMQSourceSplit originalSplit =
+                new RocketMQSourceSplit("testTopic", "testBroker", 0, 100L, 
200L, false);
+
+        byte[] serialized = serializer.serialize(originalSplit);
+        RocketMQSourceSplit deserializedSplit = serializer.deserialize(0, 
serialized);
+
+        assertEquals(originalSplit.getTopic(), deserializedSplit.getTopic());
+        assertEquals(originalSplit.getBrokerName(), 
deserializedSplit.getBrokerName());
+        assertEquals(originalSplit.getQueueId(), 
deserializedSplit.getQueueId());
+        assertEquals(originalSplit.getStartingOffset(), 
deserializedSplit.getStartingOffset());
+        assertEquals(originalSplit.getStoppingOffset(), 
deserializedSplit.getStoppingOffset());
+        assertEquals(true, deserializedSplit.getIsIncrease());
+    }
 }

Reply via email to