This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
The following commit(s) were added to refs/heads/main by this push: new fce246b [RIP-74] Implement dynamic load balancing in Flink-Connector-RocketMQ (#126) fce246b is described below commit fce246bda5d2a17edd6d793f3b6cb54deb18872d Author: hqbfz <125714719+3424672...@users.noreply.github.com> AuthorDate: Thu Mar 6 17:18:02 2025 +0800 [RIP-74] Implement dynamic load balancing in Flink-Connector-RocketMQ (#126) --- .../rocketmq/common/event/SourceCheckEvent.java} | 27 +- .../rocketmq/common/event/SourceDetectEvent.java} | 25 +- .../common/event/SourceInitAssignEvent.java} | 25 +- .../common/event/SourceReportOffsetEvent.java | 60 ++++ .../connector/rocketmq/common/lock/SpinLock.java | 35 ++ .../rocketmq/legacy/RocketMQSourceFunction.java | 3 +- .../rocketmq/source/InnerConsumerImpl.java | 3 +- .../connector/rocketmq/source/RocketMQSource.java | 18 +- .../RocketMQSourceEnumStateSerializer.java | 3 +- .../enumerator/RocketMQSourceEnumerator.java | 355 +++++++++++++++++++-- .../enumerator/allocate/AllocateStrategy.java | 13 + .../allocate/AllocateStrategyFactory.java | 3 + .../allocate/AverageAllocateStrategy.java | 43 +++ .../allocate/BroadcastAllocateStrategy.java | 6 + .../allocate/ConsistentHashAllocateStrategy.java | 6 + .../metrics/RocketMQSourceReaderMetrics.java | 2 + .../reader/RocketMQSourceFetcherManager.java | 4 + .../source/reader/RocketMQSourceReader.java | 99 +++++- .../source/reader/RocketMQSplitReader.java | 101 ++++-- .../split/RocketMQPartitionSplitSerializer.java | 8 +- .../rocketmq/source/split/RocketMQSourceSplit.java | 63 +++- .../source/split/RocketMQSourceSplitState.java | 7 +- .../sourceFunction/RocketMQSourceFunctionTest.java | 1 - .../allocate/AverageAllocateStrategyTest.java | 97 ++++++ .../RocketMQPartitionSplitSerializerTest.java | 52 +++ 25 files changed, 952 insertions(+), 107 deletions(-) diff --git a/src/test/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializerTest.java b/src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceCheckEvent.java similarity index 52% copy from src/test/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializerTest.java copy to src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceCheckEvent.java index f1fa5af..70a7206 100644 --- a/src/test/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializerTest.java +++ b/src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceCheckEvent.java @@ -16,24 +16,23 @@ * limitations under the License. */ -package org.apache.flink.connector.rocketmq.source.split; +package org.apache.flink.connector.rocketmq.common.event; -import org.junit.Test; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.java.tuple.Tuple2; -import java.io.IOException; +import org.apache.rocketmq.common.message.MessageQueue; -import static org.junit.Assert.assertEquals; +import java.util.Map; -/** Test for {@link RocketMQPartitionSplitSerializer}. */ -public class RocketMQPartitionSplitSerializerTest { +public class SourceCheckEvent implements SourceEvent { + private Map<MessageQueue, Tuple2<Long, Long>> assignedMq; - @Test - public void testSerializePartitionSplit() throws IOException { - RocketMQPartitionSplitSerializer serializer = new RocketMQPartitionSplitSerializer(); - RocketMQSourceSplit expected = - new RocketMQSourceSplit("test-split-serialization", "taobaodaily", 256, 100, 300); - RocketMQSourceSplit actual = - serializer.deserialize(serializer.getVersion(), serializer.serialize(expected)); - assertEquals(expected, actual); + public Map<MessageQueue, Tuple2<Long, Long>> getAssignedMq() { + return assignedMq; + } + + public void setAssignedMq(Map<MessageQueue, Tuple2<Long, Long>> assignedMq) { + this.assignedMq = assignedMq; } } diff --git a/src/test/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializerTest.java b/src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceDetectEvent.java similarity index 52% copy from src/test/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializerTest.java copy to src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceDetectEvent.java index f1fa5af..af8c4e3 100644 --- a/src/test/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializerTest.java +++ b/src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceDetectEvent.java @@ -16,24 +16,19 @@ * limitations under the License. */ -package org.apache.flink.connector.rocketmq.source.split; +package org.apache.flink.connector.rocketmq.common.event; -import org.junit.Test; +import org.apache.flink.api.connector.source.SourceEvent; -import java.io.IOException; +public class SourceDetectEvent implements SourceEvent { + // Request to resend the initial allocation result + private boolean reSendInitAssign = true; -import static org.junit.Assert.assertEquals; - -/** Test for {@link RocketMQPartitionSplitSerializer}. */ -public class RocketMQPartitionSplitSerializerTest { + public boolean getReSendInitAssign() { + return reSendInitAssign; + } - @Test - public void testSerializePartitionSplit() throws IOException { - RocketMQPartitionSplitSerializer serializer = new RocketMQPartitionSplitSerializer(); - RocketMQSourceSplit expected = - new RocketMQSourceSplit("test-split-serialization", "taobaodaily", 256, 100, 300); - RocketMQSourceSplit actual = - serializer.deserialize(serializer.getVersion(), serializer.serialize(expected)); - assertEquals(expected, actual); + public void setReSendInitAssign(boolean reSendInitAssign) { + this.reSendInitAssign = reSendInitAssign; } } diff --git a/src/test/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializerTest.java b/src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceInitAssignEvent.java similarity index 52% copy from src/test/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializerTest.java copy to src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceInitAssignEvent.java index f1fa5af..347ec51 100644 --- a/src/test/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializerTest.java +++ b/src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceInitAssignEvent.java @@ -16,24 +16,21 @@ * limitations under the License. */ -package org.apache.flink.connector.rocketmq.source.split; +package org.apache.flink.connector.rocketmq.common.event; -import org.junit.Test; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.connector.rocketmq.source.split.RocketMQSourceSplit; -import java.io.IOException; +import java.util.List; -import static org.junit.Assert.assertEquals; +public class SourceInitAssignEvent implements SourceEvent { + private List<RocketMQSourceSplit> splits; -/** Test for {@link RocketMQPartitionSplitSerializer}. */ -public class RocketMQPartitionSplitSerializerTest { + public void setSplits(List<RocketMQSourceSplit> splits) { + this.splits = splits; + } - @Test - public void testSerializePartitionSplit() throws IOException { - RocketMQPartitionSplitSerializer serializer = new RocketMQPartitionSplitSerializer(); - RocketMQSourceSplit expected = - new RocketMQSourceSplit("test-split-serialization", "taobaodaily", 256, 100, 300); - RocketMQSourceSplit actual = - serializer.deserialize(serializer.getVersion(), serializer.serialize(expected)); - assertEquals(expected, actual); + public List<RocketMQSourceSplit> getSplits() { + return splits; } } diff --git a/src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceReportOffsetEvent.java b/src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceReportOffsetEvent.java new file mode 100644 index 0000000..d850256 --- /dev/null +++ b/src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceReportOffsetEvent.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rocketmq.common.event; + +import org.apache.flink.api.connector.source.SourceEvent; + +public class SourceReportOffsetEvent implements SourceEvent { + private String topic; + private String broker; + private int queueId; + private long checkpoint = -1; + + public void setBroker(String broker) { + this.broker = broker; + } + + public void setCheckpoint(long checkpoint) { + this.checkpoint = checkpoint; + } + + public void setQueueId(int queueId) { + this.queueId = queueId; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public long getCheckpoint() { + return checkpoint; + } + + public int getQueueId() { + return queueId; + } + + public String getBroker() { + return broker; + } + + public String getTopic() { + return topic; + } +} diff --git a/src/main/java/org/apache/flink/connector/rocketmq/common/lock/SpinLock.java b/src/main/java/org/apache/flink/connector/rocketmq/common/lock/SpinLock.java new file mode 100644 index 0000000..dbb32c2 --- /dev/null +++ b/src/main/java/org/apache/flink/connector/rocketmq/common/lock/SpinLock.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rocketmq.common.lock; + +import java.util.concurrent.atomic.AtomicBoolean; + +public class SpinLock { + private AtomicBoolean lock = new AtomicBoolean(false); + + public void lock() { + boolean lock = false; + do { + lock = this.lock.compareAndSet(false, true); + } while (!lock); + } + + public void unlock() { + this.lock.set(false); + } +} diff --git a/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java b/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java index bedf97f..91d1d39 100644 --- a/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java +++ b/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java @@ -537,7 +537,8 @@ public class RocketMQSourceFunction<OUT> extends RichParallelSourceFunction<OUT> } } - public void initOffsetTableFromRestoredOffsets(List<MessageQueue> messageQueues) throws MQClientException { + public void initOffsetTableFromRestoredOffsets(List<MessageQueue> messageQueues) + throws MQClientException { Preconditions.checkNotNull(restoredOffsets, "restoredOffsets can't be null"); restoredOffsets.forEach( (mq, offset) -> { diff --git a/src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java b/src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java index 317031f..a76a198 100644 --- a/src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java +++ b/src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java @@ -17,7 +17,6 @@ package org.apache.flink.connector.rocketmq.source; -import com.alibaba.fastjson.JSON; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.rocketmq.source.enumerator.offset.OffsetsSelector; @@ -26,6 +25,8 @@ import org.apache.flink.connector.rocketmq.source.reader.MessageViewExt; import org.apache.flink.connector.rocketmq.source.util.UtilAll; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.StringUtils; + +import com.alibaba.fastjson.JSON; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; diff --git a/src/main/java/org/apache/flink/connector/rocketmq/source/RocketMQSource.java b/src/main/java/org/apache/flink/connector/rocketmq/source/RocketMQSource.java index 25fd8c7..cfdc0a1 100644 --- a/src/main/java/org/apache/flink/connector/rocketmq/source/RocketMQSource.java +++ b/src/main/java/org/apache/flink/connector/rocketmq/source/RocketMQSource.java @@ -124,13 +124,14 @@ public class RocketMQSource<OUT> final RocketMQSourceReaderMetrics rocketMQSourceReaderMetrics = new RocketMQSourceReaderMetrics(readerContext.metricGroup()); - Supplier<SplitReader<MessageView, RocketMQSourceSplit>> splitReaderSupplier = - () -> - new RocketMQSplitReader<>( - configuration, - readerContext, - deserializationSchema, - rocketMQSourceReaderMetrics); + // unique reader + RocketMQSplitReader<OUT> reader = + new RocketMQSplitReader<>( + configuration, + readerContext, + deserializationSchema, + rocketMQSourceReaderMetrics); + Supplier<SplitReader<MessageView, RocketMQSourceSplit>> splitReaderSupplier = () -> reader; RocketMQSourceFetcherManager rocketmqSourceFetcherManager = new RocketMQSourceFetcherManager( @@ -145,7 +146,8 @@ public class RocketMQSource<OUT> recordEmitter, configuration, readerContext, - rocketMQSourceReaderMetrics); + rocketMQSourceReaderMetrics, + splitReaderSupplier); } @Override diff --git a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumStateSerializer.java b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumStateSerializer.java index 805df1b..960e622 100644 --- a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumStateSerializer.java +++ b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumStateSerializer.java @@ -18,8 +18,9 @@ package org.apache.flink.connector.rocketmq.source.enumerator; -import com.alibaba.fastjson.JSON; import org.apache.flink.core.io.SimpleVersionedSerializer; + +import com.alibaba.fastjson.JSON; import org.apache.rocketmq.common.message.MessageQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java index 77c0c33..c228309 100644 --- a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java +++ b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java @@ -18,16 +18,21 @@ package org.apache.flink.connector.rocketmq.source.enumerator; -import com.alibaba.fastjson.JSON; -import com.google.common.collect.Sets; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.rocketmq.common.event.SourceCheckEvent; +import org.apache.flink.connector.rocketmq.common.event.SourceDetectEvent; +import org.apache.flink.connector.rocketmq.common.event.SourceInitAssignEvent; +import org.apache.flink.connector.rocketmq.common.event.SourceReportOffsetEvent; +import org.apache.flink.connector.rocketmq.common.lock.SpinLock; import org.apache.flink.connector.rocketmq.source.InnerConsumer; import org.apache.flink.connector.rocketmq.source.InnerConsumerImpl; import org.apache.flink.connector.rocketmq.source.RocketMQSourceOptions; @@ -36,18 +41,26 @@ import org.apache.flink.connector.rocketmq.source.enumerator.allocate.AllocateSt import org.apache.flink.connector.rocketmq.source.enumerator.offset.OffsetsSelector; import org.apache.flink.connector.rocketmq.source.split.RocketMQSourceSplit; import org.apache.flink.util.FlinkRuntimeException; + +import com.alibaba.fastjson.JSON; +import com.google.common.collect.Sets; import org.apache.rocketmq.common.message.MessageQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; + import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -69,18 +82,31 @@ public class RocketMQSourceEnumerator private final OffsetsSelector startingOffsetsSelector; private final OffsetsSelector stoppingOffsetsSelector; + // Used for queue dynamic allocation + private final Map<MessageQueue, Long> checkedOffsets; + private boolean[] initTask; + // The internal states of the enumerator. // This set is only accessed by the partition discovery callable in the callAsync() method. // The current assignment by reader id. Only accessed by the coordinator thread. // The discovered and initialized partition splits that are waiting for owner reader to be // ready. - private final Set<MessageQueue> allocatedSet; + private final Map<MessageQueue, Byte> allocatedSet; private final Map<Integer, Set<RocketMQSourceSplit>> pendingSplitAssignmentMap; + // Only Maintaining mapping relationship + private final Map<Integer, Set<RocketMQSourceSplit>> assignedMap; + private final Map<MessageQueue, Integer /* taskId */> reflectedQueueToTaskId; + // Param from configuration private final String groupId; private final long partitionDiscoveryIntervalMs; + // Indicates the number of allocated queues + private int partitionId; + private final SpinLock lock; + private ScheduledExecutorService scheduledExecutorService; + public RocketMQSourceEnumerator( OffsetsSelector startingOffsetsSelector, OffsetsSelector stoppingOffsetsSelector, @@ -107,13 +133,19 @@ public class RocketMQSourceEnumerator this.configuration = configuration; this.context = context; this.boundedness = boundedness; + this.lock = new SpinLock(); // Support allocate splits to reader + this.checkedOffsets = new ConcurrentHashMap<>(); + this.reflectedQueueToTaskId = new ConcurrentHashMap<>(); this.pendingSplitAssignmentMap = new ConcurrentHashMap<>(); - this.allocatedSet = new HashSet<>(currentSplitAssignment); + this.allocatedSet = new ConcurrentHashMap<>(); + this.assignedMap = new ConcurrentHashMap<>(); this.allocateStrategy = AllocateStrategyFactory.getStrategy( - configuration, context, new RocketMQSourceEnumState(allocatedSet)); + configuration, + context, + new RocketMQSourceEnumState(currentSplitAssignment)); // For rocketmq setting this.groupId = configuration.getString(RocketMQSourceOptions.CONSUMER_GROUP); @@ -121,12 +153,23 @@ public class RocketMQSourceEnumerator this.stoppingOffsetsSelector = stoppingOffsetsSelector; this.partitionDiscoveryIntervalMs = configuration.getLong(RocketMQSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS); + this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + + // Initialize the task status + log.info( + "Starting the RocketMQSourceEnumerator with current split assignment: {}", + currentSplitAssignment); + if (!currentSplitAssignment.isEmpty()) { + this.initTask = new boolean[context.currentParallelism()]; + } } @Override public void start() { consumer = new InnerConsumerImpl(configuration); consumer.start(); + scheduledExecutorService.scheduleAtFixedRate( + this::notifyAssignResult, 30 * 1000, 30 * 1000, TimeUnit.MILLISECONDS); if (partitionDiscoveryIntervalMs > 0) { log.info( @@ -190,7 +233,7 @@ public class RocketMQSourceEnumerator @Override public RocketMQSourceEnumState snapshotState(long checkpointId) { - return new RocketMQSourceEnumState(allocatedSet); + return new RocketMQSourceEnumState(allocatedSet.keySet()); } @Override @@ -205,9 +248,32 @@ public class RocketMQSourceEnumerator } } + @Override + public void handleSourceEvent(int taskId, SourceEvent sourceEvent) { + if (sourceEvent instanceof SourceReportOffsetEvent) { + handleOffsetEvent(taskId, (SourceReportOffsetEvent) sourceEvent); + } else if (sourceEvent instanceof SourceInitAssignEvent) { + handleInitAssignEvent(taskId, (SourceInitAssignEvent) sourceEvent); + } + } + // ----------------- private methods ------------------- private Set<MessageQueue> requestServiceDiscovery() { + // Ensure all subtasks have been initialized + try { + if (initTask != null) { + for (int i = 0; i < context.currentParallelism(); i++) { + if (!initTask[i]) { + context.sendEventToSourceReader(i, new SourceDetectEvent()); + } + } + } + } catch (Exception e) { + log.error("init request resend error, please check task has started"); + return null; + } + Set<String> topicSet = Sets.newHashSet( configuration @@ -235,6 +301,9 @@ public class RocketMQSourceEnumerator if (t != null) { throw new FlinkRuntimeException("Failed to handle source splits change due to ", t); } + if (latestSet == null) { + return; + } final SourceChangeResult sourceChangeResult = getSourceChangeResult(latestSet); if (sourceChangeResult.isEmpty()) { @@ -248,30 +317,59 @@ public class RocketMQSourceEnumerator // This method should only be invoked in the coordinator executor thread. private SourceSplitChangeResult initializeSourceSplits(SourceChangeResult sourceChangeResult) { + lock.lock(); + Set<MessageQueue> increaseSet = sourceChangeResult.getIncreaseSet(); + Set<MessageQueue> decreaseSet = sourceChangeResult.getDecreaseSet(); OffsetsSelector.MessageQueueOffsetsRetriever offsetsRetriever = new InnerConsumerImpl.RemotingOffsetsRetrieverImpl(consumer); - Map<MessageQueue, Long> startingOffsets = + Map<MessageQueue, Long> increaseStartingOffsets = startingOffsetsSelector.getMessageQueueOffsets(increaseSet, offsetsRetriever); - Map<MessageQueue, Long> stoppingOffsets = + Map<MessageQueue, Long> increaseStoppingOffsets = stoppingOffsetsSelector.getMessageQueueOffsets(increaseSet, offsetsRetriever); + Map<MessageQueue, Long> decreaseStoppingOffsets = + stoppingOffsetsSelector.getMessageQueueOffsets(decreaseSet, offsetsRetriever); + Map<MessageQueue, Long> decreaseStartingOffsets = + startingOffsetsSelector.getMessageQueueOffsets(decreaseSet, offsetsRetriever); Set<RocketMQSourceSplit> increaseSplitSet = increaseSet.stream() .map( mq -> { - long startingOffset = startingOffsets.get(mq); + long startingOffset = increaseStartingOffsets.get(mq); long stoppingOffset = - stoppingOffsets.getOrDefault( + increaseStoppingOffsets.getOrDefault( mq, RocketMQSourceSplit.NO_STOPPING_OFFSET); return new RocketMQSourceSplit( mq, startingOffset, stoppingOffset); }) .collect(Collectors.toSet()); + // Update cache + increaseSet.forEach( + mq -> + checkedOffsets.put( + mq, + increaseStartingOffsets.getOrDefault( + mq, RocketMQSourceSplit.NO_STOPPING_OFFSET))); + + Set<RocketMQSourceSplit> decreaseSplitSet = + decreaseSet.stream() + .map( + mq -> { + long startingOffset = decreaseStartingOffsets.get(mq); + long stoppingOffset = + decreaseStoppingOffsets.getOrDefault( + mq, RocketMQSourceSplit.NO_STOPPING_OFFSET); + allocatedSet.remove(mq); + checkedOffsets.remove(mq); + return new RocketMQSourceSplit( + mq, startingOffset, stoppingOffset, false); + }) + .collect(Collectors.toSet()); - return new SourceSplitChangeResult(increaseSplitSet, sourceChangeResult.getDecreaseSet()); + return new SourceSplitChangeResult(increaseSplitSet, decreaseSplitSet); } /** @@ -291,15 +389,145 @@ public class RocketMQSourceEnumerator if (partitionDiscoveryIntervalMs <= 0) { log.info("Split changes, but dynamic partition discovery is disabled."); } - this.calculateSplitAssignment(sourceSplitChangeResult); - this.sendSplitChangesToRemote(context.registeredReaders().keySet()); + try { + this.calculateSplitAssignment(sourceSplitChangeResult); + this.sendSplitChangesToRemote(context.registeredReaders().keySet()); + } finally { + lock.unlock(); + } } /** Calculate new split assignment according allocate strategy */ private void calculateSplitAssignment(SourceSplitChangeResult sourceSplitChangeResult) { - Map<Integer, Set<RocketMQSourceSplit>> newSourceSplitAllocateMap = - this.allocateStrategy.allocate( - sourceSplitChangeResult.getIncreaseSet(), context.currentParallelism()); + Map<Integer, Set<RocketMQSourceSplit>> newSourceSplitAllocateMap; + + // Preliminary calculation of distribution results + { + // Allocate valid queues + if (sourceSplitChangeResult.decreaseSet != null + && !sourceSplitChangeResult.decreaseSet.isEmpty()) { + partitionId = 0; + + // Re-load balancing + Set<RocketMQSourceSplit> allMQ = new HashSet<>(); + OffsetsSelector.MessageQueueOffsetsRetriever offsetsRetriever = + new InnerConsumerImpl.RemotingOffsetsRetrieverImpl(consumer); + Map<MessageQueue, Long> stoppingOffsets = + stoppingOffsetsSelector.getMessageQueueOffsets( + allocatedSet.keySet(), offsetsRetriever); + Set<MessageQueue> delete = + sourceSplitChangeResult.decreaseSet.stream() + .map(RocketMQSourceSplit::getMessageQueue) + .collect(Collectors.toSet()); + + // Calculate all queue + allMQ.addAll(sourceSplitChangeResult.increaseSet); + allocatedSet + .keySet() + .forEach( + mq -> { + if (!delete.contains(mq)) { + allMQ.add( + new RocketMQSourceSplit( + mq, + checkedOffsets.get(mq), + stoppingOffsets.getOrDefault( + mq, + RocketMQSourceSplit + .NO_STOPPING_OFFSET))); + } + }); + newSourceSplitAllocateMap = + this.allocateStrategy.allocate( + allMQ, context.currentParallelism(), partitionId); + + // Update cache + assignedMap.clear(); + for (Map.Entry entry : newSourceSplitAllocateMap.entrySet()) { + assignedMap.put( + (Integer) entry.getKey(), + ((Set<RocketMQSourceSplit>) entry.getValue()) + .stream() + .map(RocketMQSourceSplit::clone) + .collect(Collectors.toSet())); + } + partitionId = allMQ.size(); + } else { + newSourceSplitAllocateMap = + this.allocateStrategy.allocate( + sourceSplitChangeResult.getIncreaseSet(), + context.currentParallelism(), + partitionId); + + // Update cache + newSourceSplitAllocateMap.forEach( + (k, v) -> + v.forEach( + mq -> + assignedMap + .computeIfAbsent(k, r -> new HashSet<>()) + .add(mq))); + partitionId += sourceSplitChangeResult.getIncreaseSet().size(); + } + + // Allocate deleted queues + if (sourceSplitChangeResult.decreaseSet != null + && !sourceSplitChangeResult.decreaseSet.isEmpty()) { + sourceSplitChangeResult.decreaseSet.forEach( + mq -> { + newSourceSplitAllocateMap + .computeIfAbsent( + reflectedQueueToTaskId.get(mq.getMessageQueue()), + k -> new HashSet<>()) + .add(mq); + reflectedQueueToTaskId.remove(mq.getMessageQueue()); + }); + } + } + + { + // Calculate the result after queue migration + if (sourceSplitChangeResult.decreaseSet != null + && !sourceSplitChangeResult.decreaseSet.isEmpty()) { + Map<Integer, Set<RocketMQSourceSplit>> migrationQueue = new HashMap<>(); + Map<Integer, Set<RocketMQSourceSplit>> noMigrationQueue = new HashMap<>(); + for (Map.Entry entry : newSourceSplitAllocateMap.entrySet()) { + int taskId = (int) entry.getKey(); + Set<RocketMQSourceSplit> splits = (Set<RocketMQSourceSplit>) entry.getValue(); + for (RocketMQSourceSplit split : splits) { + if (!split.getIsIncrease()) { + continue; + } + if (taskId != reflectedQueueToTaskId.get(split.getMessageQueue())) { + migrationQueue + .computeIfAbsent( + reflectedQueueToTaskId.get(split.getMessageQueue()), + k -> new HashSet<>()) + .add( + new RocketMQSourceSplit( + split.getMessageQueue(), + split.getStartingOffset(), + split.getStoppingOffset(), + false)); + } else { + noMigrationQueue + .computeIfAbsent(taskId, k -> new HashSet<>()) + .add(split); + } + } + } + + // finally result + migrationQueue.forEach( + (taskId, splits) -> { + newSourceSplitAllocateMap.get(taskId).addAll(splits); + }); + noMigrationQueue.forEach( + (taskId, splits) -> { + newSourceSplitAllocateMap.get(taskId).removeAll(splits); + }); + } + } for (Map.Entry<Integer, Set<RocketMQSourceSplit>> entry : newSourceSplitAllocateMap.entrySet()) { @@ -330,7 +558,12 @@ public class RocketMQSourceEnumerator .computeIfAbsent(pendingReader, k -> new ArrayList<>()) .addAll(pendingAssignmentForReader); pendingAssignmentForReader.forEach( - split -> this.allocatedSet.add(split.getMessageQueue())); + split -> { + if (split.getIsIncrease()) { + this.allocatedSet.put(split.getMessageQueue(), (byte) 1); + reflectedQueueToTaskId.put(split.getMessageQueue(), pendingReader); + } + }); } } @@ -352,6 +585,86 @@ public class RocketMQSourceEnumerator } } + private void handleInitAssignEvent(int taskId, SourceInitAssignEvent initAssignEvent) { + if (this.initTask == null || this.initTask[taskId]) { + return; + } + lock.lock(); + try { + // sync assign result + if (initAssignEvent.getSplits() != null && !initAssignEvent.getSplits().isEmpty()) { + log.info( + "Received SourceInitAssignEvent from reader {} with {} splits.", + taskId, + initAssignEvent.getSplits().toString()); + initAssignEvent + .getSplits() + .forEach( + split -> { + this.assignedMap + .computeIfAbsent(taskId, r -> new HashSet<>()) + .add(split); + this.checkedOffsets.put( + split.getMessageQueue(), split.getStoppingOffset()); + this.reflectedQueueToTaskId.put( + split.getMessageQueue(), taskId); + this.allocatedSet.put(split.getMessageQueue(), (byte) 1); + }); + } + this.initTask[taskId] = true; + } finally { + lock.unlock(); + } + } + + private void handleOffsetEvent(int taskId, SourceReportOffsetEvent sourceReportOffsetEvent) { + lock.lock(); + try { + // Update offset of message queue + if (sourceReportOffsetEvent != null && sourceReportOffsetEvent.getCheckpoint() != -1) { + log.info( + "Received SourceReportOffsetEvent from reader {} with offset {}", + taskId, + sourceReportOffsetEvent.getCheckpoint()); + MessageQueue mq = + new MessageQueue( + sourceReportOffsetEvent.getTopic(), + sourceReportOffsetEvent.getBroker(), + sourceReportOffsetEvent.getQueueId()); + this.checkedOffsets.put(mq, sourceReportOffsetEvent.getCheckpoint()); + } + } finally { + lock.unlock(); + } + } + + private void notifyAssignResult() { + if (assignedMap.isEmpty()) { + return; + } + lock.lock(); + try { + for (Map.Entry<Integer, Set<RocketMQSourceSplit>> entry : assignedMap.entrySet()) { + SourceCheckEvent sourceCheckEvent = new SourceCheckEvent(); + Map<MessageQueue, Tuple2<Long, Long>> assignedMq = new HashMap<>(); + + entry.getValue() + .forEach( + split -> { + assignedMq.put( + split.getMessageQueue(), + new Tuple2<>( + split.getStartingOffset(), + split.getStoppingOffset())); + }); + sourceCheckEvent.setAssignedMq(assignedMq); + context.sendEventToSourceReader(entry.getKey(), sourceCheckEvent); + } + } finally { + lock.unlock(); + } + } + /** A container class to hold the newly added partitions and removed partitions. */ @VisibleForTesting private static class SourceChangeResult { @@ -380,7 +693,7 @@ public class RocketMQSourceEnumerator public static class SourceSplitChangeResult { private final Set<RocketMQSourceSplit> increaseSet; - private final Set<MessageQueue> decreaseSet; + private final Set<RocketMQSourceSplit> decreaseSet; private SourceSplitChangeResult(Set<RocketMQSourceSplit> increaseSet) { this.increaseSet = Collections.unmodifiableSet(increaseSet); @@ -388,7 +701,7 @@ public class RocketMQSourceEnumerator } private SourceSplitChangeResult( - Set<RocketMQSourceSplit> increaseSet, Set<MessageQueue> decreaseSet) { + Set<RocketMQSourceSplit> increaseSet, Set<RocketMQSourceSplit> decreaseSet) { this.increaseSet = Collections.unmodifiableSet(increaseSet); this.decreaseSet = Collections.unmodifiableSet(decreaseSet); } @@ -397,14 +710,14 @@ public class RocketMQSourceEnumerator return increaseSet; } - public Set<MessageQueue> getDecreaseSet() { + public Set<RocketMQSourceSplit> getDecreaseSet() { return decreaseSet; } } @VisibleForTesting private SourceChangeResult getSourceChangeResult(Set<MessageQueue> latestSet) { - Set<MessageQueue> currentSet = Collections.unmodifiableSet(this.allocatedSet); + Set<MessageQueue> currentSet = Collections.unmodifiableSet(this.allocatedSet.keySet()); Set<MessageQueue> increaseSet = Sets.difference(latestSet, currentSet); Set<MessageQueue> decreaseSet = Sets.difference(currentSet, latestSet); diff --git a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AllocateStrategy.java b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AllocateStrategy.java index bfd814e..6386b22 100644 --- a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AllocateStrategy.java +++ b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AllocateStrategy.java @@ -44,4 +44,17 @@ public interface AllocateStrategy { */ Map<Integer, Set<RocketMQSourceSplit>> allocate( final Collection<RocketMQSourceSplit> mqAll, final int parallelism); + + /** + * Allocates RocketMQ source splits to Flink tasks based on the selected allocation strategy. + * + * @param mqAll a collection of all available RocketMQ source splits + * @param parallelism the desired parallelism for the Flink tasks + * @param globalAssignedNumber number of allocated queues + * @return a map of task indices to sets of corresponding RocketMQ source splits + */ + Map<Integer, Set<RocketMQSourceSplit>> allocate( + final Collection<RocketMQSourceSplit> mqAll, + final int parallelism, + int globalAssignedNumber); } diff --git a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AllocateStrategyFactory.java b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AllocateStrategyFactory.java index 6c9d723..442afb9 100644 --- a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AllocateStrategyFactory.java +++ b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AllocateStrategyFactory.java @@ -27,6 +27,7 @@ public class AllocateStrategyFactory { public static final String STRATEGY_NAME_BROADCAST = "broadcast"; public static final String STRATEGY_NAME_CONSISTENT_HASH = "hash"; + public static final String STRATEGY_NAME_AVERAGE = "average"; private AllocateStrategyFactory() { // No public constructor. @@ -46,6 +47,8 @@ public class AllocateStrategyFactory { return new ConsistentHashAllocateStrategy(); case STRATEGY_NAME_BROADCAST: return new BroadcastAllocateStrategy(); + case STRATEGY_NAME_AVERAGE: + return new AverageAllocateStrategy(); default: throw new IllegalArgumentException( "We don't support this allocate strategy: " + allocateStrategyName); diff --git a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AverageAllocateStrategy.java b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AverageAllocateStrategy.java new file mode 100644 index 0000000..f808133 --- /dev/null +++ b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AverageAllocateStrategy.java @@ -0,0 +1,43 @@ +package org.apache.flink.connector.rocketmq.source.enumerator.allocate; + +import org.apache.flink.connector.rocketmq.source.split.RocketMQSourceSplit; + +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class AverageAllocateStrategy implements AllocateStrategy { + @Override + public String getStrategyName() { + return AllocateStrategyFactory.STRATEGY_NAME_AVERAGE; + } + + @Override + public Map<Integer, Set<RocketMQSourceSplit>> allocate( + Collection<RocketMQSourceSplit> mqAll, int parallelism) { + return null; + } + + @Override + public Map<Integer, Set<RocketMQSourceSplit>> allocate( + Collection<RocketMQSourceSplit> mqAll, int parallelism, int globalAssignedNumber) { + Map<Integer, Set<RocketMQSourceSplit>> result = new HashMap<>(); + for (RocketMQSourceSplit mq : mqAll) { + int readerIndex = + this.getSplitOwner(mq.getTopic(), globalAssignedNumber++, parallelism); + result.computeIfAbsent(readerIndex, k -> new HashSet<>()).add(mq); + } + return result; + } + + private int getSplitOwner(String topic, int partition, int numReaders) { + int startIndex = ((topic.hashCode() * 31) & 0x7FFFFFFF) % numReaders; + + // here, the assumption is that the id of RocketMQ partitions are always ascending + // starting from 0, and therefore can be used directly as the offset clockwise from the + // start index + return (startIndex + partition) % numReaders; + } +} diff --git a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/BroadcastAllocateStrategy.java b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/BroadcastAllocateStrategy.java index 2e46419..f37cf04 100644 --- a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/BroadcastAllocateStrategy.java +++ b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/BroadcastAllocateStrategy.java @@ -40,4 +40,10 @@ public class BroadcastAllocateStrategy implements AllocateStrategy { } return result; } + + @Override + public Map<Integer, Set<RocketMQSourceSplit>> allocate( + Collection<RocketMQSourceSplit> mqAll, int parallelism, int globalAssignedNumber) { + return allocate(mqAll, parallelism); + } } diff --git a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/ConsistentHashAllocateStrategy.java b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/ConsistentHashAllocateStrategy.java index 6a3ad2c..7cd10c4 100644 --- a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/ConsistentHashAllocateStrategy.java +++ b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/ConsistentHashAllocateStrategy.java @@ -48,4 +48,10 @@ public class ConsistentHashAllocateStrategy implements AllocateStrategy { } return result; } + + @Override + public Map<Integer, Set<RocketMQSourceSplit>> allocate( + Collection<RocketMQSourceSplit> mqAll, int parallelism, int globalAssignedNumber) { + return allocate(mqAll, parallelism); + } } diff --git a/src/main/java/org/apache/flink/connector/rocketmq/source/metrics/RocketMQSourceReaderMetrics.java b/src/main/java/org/apache/flink/connector/rocketmq/source/metrics/RocketMQSourceReaderMetrics.java index 09c1049..57c8a63 100644 --- a/src/main/java/org/apache/flink/connector/rocketmq/source/metrics/RocketMQSourceReaderMetrics.java +++ b/src/main/java/org/apache/flink/connector/rocketmq/source/metrics/RocketMQSourceReaderMetrics.java @@ -45,4 +45,6 @@ public class RocketMQSourceReaderMetrics { public RocketMQSourceReaderMetrics(SourceReaderMetricGroup sourceReaderMetricGroup) {} public void registerNewMessageQueue(MessageQueue messageQueue) {} + + public void unregisterMessageQueue(MessageQueue messageQueue) {} } diff --git a/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSourceFetcherManager.java b/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSourceFetcherManager.java index a635b17..242e19e 100644 --- a/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSourceFetcherManager.java +++ b/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSourceFetcherManager.java @@ -95,4 +95,8 @@ public class RocketMQSourceFetcherManager public void wakeUp() {} }); } + + public RocketMQSplitReader getSplitReader() { + return (RocketMQSplitReader) fetchers.get(0).getSplitReader(); + } } diff --git a/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSourceReader.java b/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSourceReader.java index 9303f35..042c12a 100644 --- a/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSourceReader.java +++ b/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSourceReader.java @@ -19,30 +19,43 @@ package org.apache.flink.connector.rocketmq.source.reader; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.source.reader.RecordEmitter; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.connector.rocketmq.common.event.SourceCheckEvent; +import org.apache.flink.connector.rocketmq.common.event.SourceDetectEvent; +import org.apache.flink.connector.rocketmq.common.event.SourceInitAssignEvent; +import org.apache.flink.connector.rocketmq.common.event.SourceReportOffsetEvent; import org.apache.flink.connector.rocketmq.source.RocketMQSourceOptions; import org.apache.flink.connector.rocketmq.source.metrics.RocketMQSourceReaderMetrics; import org.apache.flink.connector.rocketmq.source.split.RocketMQSourceSplit; import org.apache.flink.connector.rocketmq.source.split.RocketMQSourceSplitState; import org.apache.flink.connector.rocketmq.source.util.UtilAll; +import com.google.common.collect.Sets; import org.apache.rocketmq.common.message.MessageQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.function.Supplier; +import java.util.stream.Collectors; /** The source reader for RocketMQ partitions. */ public class RocketMQSourceReader<T> @@ -57,6 +70,7 @@ public class RocketMQSourceReader<T> private final SortedMap<Long, Map<MessageQueue, Long>> offsetsToCommit; private final ConcurrentMap<MessageQueue, Long> offsetsOfFinishedSplits; private final RocketMQSourceReaderMetrics rocketmqSourceReaderMetrics; + private final RocketMQSplitReader reader; public RocketMQSourceReader( FutureCompletingBlockingQueue<RecordsWithSplitIds<MessageView>> elementsQueue, @@ -64,7 +78,8 @@ public class RocketMQSourceReader<T> RecordEmitter<MessageView, T, RocketMQSourceSplitState> recordEmitter, Configuration config, SourceReaderContext context, - RocketMQSourceReaderMetrics rocketMQSourceReaderMetrics) { + RocketMQSourceReaderMetrics rocketMQSourceReaderMetrics, + Supplier<SplitReader<MessageView, RocketMQSourceSplit>> readerSupplier) { super(elementsQueue, rocketmqSourceFetcherManager, recordEmitter, config, context); this.offsetsToCommit = Collections.synchronizedSortedMap(new TreeMap<>()); @@ -72,6 +87,7 @@ public class RocketMQSourceReader<T> this.commitOffsetsOnCheckpoint = config.get(RocketMQSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT); this.rocketmqSourceReaderMetrics = rocketMQSourceReaderMetrics; + this.reader = (RocketMQSplitReader) readerSupplier.get(); } @Override @@ -136,10 +152,91 @@ public class RocketMQSourceReader<T> @Override protected RocketMQSourceSplit toSplitType(String splitId, RocketMQSourceSplitState splitState) { + // Report checkpoint progress. + SourceReportOffsetEvent sourceEvent = new SourceReportOffsetEvent(); + sourceEvent.setBroker(splitState.getBrokerName()); + sourceEvent.setTopic(splitState.getTopic()); + sourceEvent.setQueueId(splitState.getQueueId()); + sourceEvent.setCheckpoint(splitState.getCurrentOffset()); + context.sendSourceEventToCoordinator(sourceEvent); + LOG.info("Report checkpoint progress: {}", sourceEvent); return splitState.getSourceSplit(); } + @Override + public void handleSourceEvents(SourceEvent sourceEvent) { + if (sourceEvent instanceof SourceDetectEvent) { + handleSourceDetectEvent(); + } else if (sourceEvent instanceof SourceCheckEvent) { + handleSourceCheckEvent((SourceCheckEvent) sourceEvent); + } + } + // ------------------------ + private void handleSourceDetectEvent() { + SourceInitAssignEvent sourceEvent1 = new SourceInitAssignEvent(); + List<RocketMQSourceSplit> splits = new LinkedList<>(); + ConcurrentMap<MessageQueue, Tuple2<Long, Long>> currentOffsetTable = + reader.getCurrentOffsetTable(); + + if (!currentOffsetTable.isEmpty()) { + for (Map.Entry<MessageQueue, Tuple2<Long, Long>> entry : + currentOffsetTable.entrySet()) { + MessageQueue messageQueue = entry.getKey(); + Long startOffset = entry.getValue().f0; + Long stopOffset = entry.getValue().f1; + RocketMQSourceSplit split = + new RocketMQSourceSplit(messageQueue, startOffset, stopOffset); + splits.add(split); + } + } + sourceEvent1.setSplits(splits); + context.sendSourceEventToCoordinator(sourceEvent1); + reader.setInitFinish(true); + } + + private void handleSourceCheckEvent(SourceCheckEvent sourceEvent) { + Map<MessageQueue, Tuple2<Long, Long>> checkMap = sourceEvent.getAssignedMq(); + Set<MessageQueue> assignedMq = checkMap.keySet(); + Set<MessageQueue> currentMq = reader.getCurrentOffsetTable().keySet(); + Set<MessageQueue> increaseSet = Sets.difference(assignedMq, currentMq); + Set<MessageQueue> decreaseSet = Sets.difference(currentMq, assignedMq); + + if (increaseSet.isEmpty() && decreaseSet.isEmpty()) { + LOG.info("No need to checkpoint, current assigned mq is same as before."); + } + + if (!increaseSet.isEmpty()) { + SplitsAddition<RocketMQSourceSplit> increase; + increase = + new SplitsAddition<>( + increaseSet.stream() + .map( + mq -> + new RocketMQSourceSplit( + mq, + checkMap.get(mq).f0, + checkMap.get(mq).f1, + true)) + .collect(Collectors.toList())); + reader.handleSplitsChanges(increase); + } + if (!decreaseSet.isEmpty()) { + SplitsAddition<RocketMQSourceSplit> decrease; + decrease = + new SplitsAddition<>( + decreaseSet.stream() + .map( + mq -> + new RocketMQSourceSplit( + mq, + checkMap.get(mq).f0, + checkMap.get(mq).f1, + false)) + .collect(Collectors.toList())); + reader.handleSplitsChanges(decrease); + } + } @VisibleForTesting SortedMap<Long, Map<MessageQueue, Long>> getOffsetsToCommit() { diff --git a/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSplitReader.java b/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSplitReader.java index 3a07966..9a0833c 100644 --- a/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSplitReader.java +++ b/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSplitReader.java @@ -27,6 +27,8 @@ import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition; import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; import org.apache.flink.connector.rocketmq.common.config.RocketMQOptions; +import org.apache.flink.connector.rocketmq.common.event.SourceInitAssignEvent; +import org.apache.flink.connector.rocketmq.common.lock.SpinLock; import org.apache.flink.connector.rocketmq.source.InnerConsumer; import org.apache.flink.connector.rocketmq.source.InnerConsumerImpl; import org.apache.flink.connector.rocketmq.source.RocketMQSourceOptions; @@ -36,11 +38,13 @@ import org.apache.flink.connector.rocketmq.source.split.RocketMQSourceSplit; import org.apache.flink.connector.rocketmq.source.util.UtilAll; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; + import org.apache.rocketmq.common.message.MessageQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; + import java.io.IOException; import java.time.Duration; import java.util.ArrayList; @@ -81,6 +85,11 @@ public class RocketMQSplitReader<T> implements SplitReader<MessageView, RocketMQ private final ConcurrentMap<MessageQueue, Tuple2<Long, Long>> currentOffsetTable; private final RocketMQSourceReaderMetrics rocketmqSourceReaderMetrics; + // Init status : true-finish init; false-not finish init + private boolean initFinish; + private final RocketMQRecordsWithSplitIds<MessageView> recordsWithSplitIds; + private final SpinLock lock; + public RocketMQSplitReader( Configuration configuration, SourceReaderContext sourceReaderContext, @@ -92,6 +101,9 @@ public class RocketMQSplitReader<T> implements SplitReader<MessageView, RocketMQ this.deserializationSchema = deserializationSchema; this.offsetsToCommit = new TreeMap<>(); this.currentOffsetTable = new ConcurrentHashMap<>(); + this.recordsWithSplitIds = new RocketMQRecordsWithSplitIds<>(rocketmqSourceReaderMetrics); + this.initFinish = false; + this.lock = new SpinLock(); this.consumer = new InnerConsumerImpl(configuration); this.consumer.start(); @@ -103,9 +115,17 @@ public class RocketMQSplitReader<T> implements SplitReader<MessageView, RocketMQ @Override public RecordsWithSplitIds<MessageView> fetch() throws IOException { + lock.lock(); wakeup = false; RocketMQRecordsWithSplitIds<MessageView> recordsWithSplitIds = new RocketMQRecordsWithSplitIds<>(rocketmqSourceReaderMetrics); + try { + this.recordsWithSplitIds.finishedSplits.forEach( + splitId -> recordsWithSplitIds.addFinishedSplit(splitId)); + this.recordsWithSplitIds.finishedSplits.clear(); + } finally { + lock.unlock(); + } try { Duration duration = Duration.ofMillis(this.configuration.getLong(RocketMQOptions.POLL_TIMEOUT)); @@ -129,6 +149,7 @@ public class RocketMQSplitReader<T> implements SplitReader<MessageView, RocketMQ } catch (Exception e) { LOG.error("Reader fetch split error", e); } + return recordsWithSplitIds; } @@ -142,38 +163,53 @@ public class RocketMQSplitReader<T> implements SplitReader<MessageView, RocketMQ splitsChange.getClass())); } + if (!initFinish) { + LOG.info("Start to init reader"); + SourceInitAssignEvent sourceEvent = new SourceInitAssignEvent(); + sourceEvent.setSplits(splitsChange.splits()); + sourceReaderContext.sendSourceEventToCoordinator(sourceEvent); + initFinish = true; + } + lock.lock(); + + LOG.info("Receive split change: " + splitsChange.splits().toString()); // Assignment. ConcurrentMap<MessageQueue, Tuple2<Long, Long>> newOffsetTable = new ConcurrentHashMap<>(); - // Set up the stopping timestamps. - splitsChange - .splits() - .forEach( - split -> { - MessageQueue messageQueue = - new MessageQueue( - split.getTopic(), - split.getBrokerName(), - split.getQueueId()); - newOffsetTable.put( - messageQueue, - new Tuple2<>( - split.getStartingOffset(), split.getStoppingOffset())); - rocketmqSourceReaderMetrics.registerNewMessageQueue(messageQueue); - }); - - // todo: log message queue change - + try { + // Set up the stopping timestamps. + splitsChange + .splits() + .forEach( + split -> { + if (!split.getIsIncrease()) { + finishSplitAtRecord( + split.getMessageQueue(), + split.getStoppingOffset(), + recordsWithSplitIds); + } else { + if (!currentOffsetTable.containsKey(split.getMessageQueue())) { + registerSplits(split); + newOffsetTable.put( + split.getMessageQueue(), + new Tuple2<>( + split.getStartingOffset(), + split.getStoppingOffset())); + } + } + }); + } finally { + lock.unlock(); + } // It will replace the previous assignment - Set<MessageQueue> incrementalSplits = newOffsetTable.keySet(); - consumer.assign(incrementalSplits); + consumer.assign(currentOffsetTable.keySet()); // set offset to consumer for (Map.Entry<MessageQueue, Tuple2<Long, Long>> entry : newOffsetTable.entrySet()) { MessageQueue messageQueue = entry.getKey(); Long startingOffset = entry.getValue().f0; try { - consumer.seek(messageQueue, startingOffset); + consumer.seek(messageQueue, startingOffset == -1L ? 0L : startingOffset); } catch (Exception e) { String info = String.format( @@ -207,14 +243,31 @@ public class RocketMQSplitReader<T> implements SplitReader<MessageView, RocketMQ } } + public ConcurrentMap<MessageQueue, Tuple2<Long, Long>> getCurrentOffsetTable() { + return currentOffsetTable; + } + + private void registerSplits(RocketMQSourceSplit split) { + LOG.info("Register split {}", split.splitId()); + this.currentOffsetTable.put( + split.getMessageQueue(), + new Tuple2<>(split.getStartingOffset(), split.getStoppingOffset())); + this.rocketmqSourceReaderMetrics.registerNewMessageQueue(split.getMessageQueue()); + } + + public void setInitFinish(boolean initFinish) { + this.initFinish = initFinish; + } + private void finishSplitAtRecord( MessageQueue messageQueue, long currentOffset, RocketMQRecordsWithSplitIds<MessageView> recordsBySplits) { - LOG.info("message queue {} has reached stopping offset {}", messageQueue, currentOffset); - // recordsBySplits.addFinishedSplit(getSplitId(messageQueue)); + this.currentOffsetTable.remove(messageQueue); + this.rocketmqSourceReaderMetrics.unregisterMessageQueue(messageQueue); + recordsBySplits.addFinishedSplit(RocketMQSourceSplit.toSplitId(messageQueue)); } // ---------------- private helper class ------------------------ diff --git a/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializer.java b/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializer.java index 36c7a0f..eb37e19 100644 --- a/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializer.java +++ b/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializer.java @@ -47,6 +47,7 @@ public class RocketMQPartitionSplitSerializer out.writeInt(split.getQueueId()); out.writeLong(split.getStartingOffset()); out.writeLong(split.getStoppingOffset()); + out.writeBoolean(split.getIsIncrease()); out.flush(); return byteArrayOutputStream.toByteArray(); } @@ -61,8 +62,13 @@ public class RocketMQPartitionSplitSerializer int partition = in.readInt(); long startingOffset = in.readLong(); long stoppingOffset = in.readLong(); + if (version == SNAPSHOT_VERSION) { + return new RocketMQSourceSplit( + topic, broker, partition, startingOffset, stoppingOffset); + } + boolean isIncrease = in.readBoolean(); return new RocketMQSourceSplit( - topic, broker, partition, startingOffset, stoppingOffset); + topic, broker, partition, startingOffset, stoppingOffset, isIncrease); } } } diff --git a/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQSourceSplit.java b/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQSourceSplit.java index 7124086..a94044d 100644 --- a/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQSourceSplit.java +++ b/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQSourceSplit.java @@ -37,6 +37,8 @@ public class RocketMQSourceSplit implements SourceSplit { private final int queueId; private final long startingOffset; private final long stoppingOffset; + // whether the split is increase or decrease: true-increase, false-decrease + private final boolean isIncrease; public RocketMQSourceSplit( MessageQueue messageQueue, long startingOffset, long stoppingOffset) { @@ -48,6 +50,20 @@ public class RocketMQSourceSplit implements SourceSplit { stoppingOffset); } + public RocketMQSourceSplit( + MessageQueue messageQueue, + long startingOffset, + long stoppingOffset, + boolean isIncrease) { + this( + messageQueue.getTopic(), + messageQueue.getBrokerName(), + messageQueue.getQueueId(), + startingOffset, + stoppingOffset, + isIncrease); + } + public RocketMQSourceSplit( String topic, String brokerName, @@ -59,6 +75,22 @@ public class RocketMQSourceSplit implements SourceSplit { this.queueId = queueId; this.startingOffset = startingOffset; this.stoppingOffset = stoppingOffset; + this.isIncrease = true; + } + + public RocketMQSourceSplit( + String topic, + String brokerName, + int queueId, + long startingOffset, + long stoppingOffset, + boolean isIncrease) { + this.topic = topic; + this.brokerName = brokerName; + this.queueId = queueId; + this.startingOffset = startingOffset; + this.stoppingOffset = stoppingOffset; + this.isIncrease = isIncrease; } public String getTopic() { @@ -85,6 +117,28 @@ public class RocketMQSourceSplit implements SourceSplit { return new MessageQueue(topic, brokerName, queueId); } + public boolean getIsIncrease() { + return isIncrease; + } + + public static String toSplitId(MessageQueue messageQueue) { + return messageQueue.getTopic() + + SEPARATOR + + messageQueue.getBrokerName() + + SEPARATOR + + messageQueue.getQueueId(); + } + + public static RocketMQSourceSplit clone(RocketMQSourceSplit split) { + return new RocketMQSourceSplit( + split.topic, + split.brokerName, + split.queueId, + split.startingOffset, + split.stoppingOffset, + split.isIncrease); + } + @Override public String splitId() { return topic + SEPARATOR + brokerName + SEPARATOR + queueId; @@ -93,13 +147,13 @@ public class RocketMQSourceSplit implements SourceSplit { @Override public String toString() { return String.format( - "(Topic: %s, BrokerName: %s, QueueId: %d, MinOffset: %d, MaxOffset: %d)", - topic, brokerName, queueId, startingOffset, stoppingOffset); + "(Topic: %s, BrokerName: %s, QueueId: %d, MinOffset: %d, MaxOffset: %d, status: %s)", + topic, brokerName, queueId, startingOffset, stoppingOffset, isIncrease); } @Override public int hashCode() { - return Objects.hash(topic, brokerName, queueId, startingOffset, stoppingOffset); + return Objects.hash(topic, brokerName, queueId, startingOffset, stoppingOffset, isIncrease); } @Override @@ -112,6 +166,7 @@ public class RocketMQSourceSplit implements SourceSplit { && brokerName.equals(other.brokerName) && queueId == other.queueId && startingOffset == other.startingOffset - && stoppingOffset == other.stoppingOffset; + && stoppingOffset == other.stoppingOffset + && isIncrease == other.isIncrease; } } diff --git a/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQSourceSplitState.java b/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQSourceSplitState.java index ca74d58..5f16a80 100644 --- a/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQSourceSplitState.java +++ b/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQSourceSplitState.java @@ -48,6 +48,11 @@ public class RocketMQSourceSplitState extends RocketMQSourceSplit { */ public RocketMQSourceSplit getSourceSplit() { return new RocketMQSourceSplit( - getTopic(), getBrokerName(), getQueueId(), getCurrentOffset(), getStoppingOffset()); + getTopic(), + getBrokerName(), + getQueueId(), + getCurrentOffset(), + getStoppingOffset(), + getIsIncrease()); } } diff --git a/src/test/java/org/apache/flink/connector/rocketmq/legacy/sourceFunction/RocketMQSourceFunctionTest.java b/src/test/java/org/apache/flink/connector/rocketmq/legacy/sourceFunction/RocketMQSourceFunctionTest.java index 08371b3..78ffd17 100644 --- a/src/test/java/org/apache/flink/connector/rocketmq/legacy/sourceFunction/RocketMQSourceFunctionTest.java +++ b/src/test/java/org/apache/flink/connector/rocketmq/legacy/sourceFunction/RocketMQSourceFunctionTest.java @@ -26,7 +26,6 @@ import org.apache.flink.connector.rocketmq.legacy.common.serialization.SimpleStr import org.apache.flink.connector.rocketmq.legacy.common.util.TestUtils; import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; -import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.common.message.MessageQueue; import org.junit.Assert; import org.junit.Test; diff --git a/src/test/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AverageAllocateStrategyTest.java b/src/test/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AverageAllocateStrategyTest.java new file mode 100644 index 0000000..f63c8d1 --- /dev/null +++ b/src/test/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AverageAllocateStrategyTest.java @@ -0,0 +1,97 @@ +package org.apache.flink.connector.rocketmq.source.enumerator.allocate; + +import org.apache.flink.connector.rocketmq.source.split.RocketMQSourceSplit; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +public class AverageAllocateStrategyTest { + + private static final String BROKER_NAME = "brokerName"; + private static final String PREFIX_TOPIC = "test-topic-"; + private static final int NUM_SPLITS = 900; + private static final int[] SPLIT_SIZE = {1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000}; + + @Test + public void averageAllocateStrategyTest() { + AllocateStrategy allocateStrategy = new AverageAllocateStrategy(); + Collection<RocketMQSourceSplit> mqAll = new ArrayList<>(); + for (int i = 0; i < NUM_SPLITS; i++) { + mqAll.add( + new RocketMQSourceSplit( + PREFIX_TOPIC + (i + 1), + BROKER_NAME, + i, + 0, + SPLIT_SIZE[i % SPLIT_SIZE.length])); + } + int parallelism = 3; + Map<Integer, Set<RocketMQSourceSplit>> result = + allocateStrategy.allocate(mqAll, parallelism, 0); + assertEquals(NUM_SPLITS / parallelism, result.get(0).size()); + assertEquals(NUM_SPLITS / parallelism, result.get(1).size()); + assertEquals(NUM_SPLITS / parallelism, result.get(2).size()); + } + + @Test + public void averagesAllocateStrategyTest() { + AllocateStrategy allocateStrategy = new AverageAllocateStrategy(); + Collection<RocketMQSourceSplit> mqAll = new ArrayList<>(); + for (int i = 0; i < NUM_SPLITS; i++) { + mqAll.add( + new RocketMQSourceSplit( + PREFIX_TOPIC + (i + 1), + BROKER_NAME, + i, + 0, + SPLIT_SIZE[i % SPLIT_SIZE.length])); + } + int parallelism = 3; + Map<Integer, Set<RocketMQSourceSplit>> result = + allocateStrategy.allocate(mqAll, parallelism, 0); + assertEquals(NUM_SPLITS / parallelism, result.get(0).size()); + assertEquals(NUM_SPLITS / parallelism, result.get(1).size()); + assertEquals(NUM_SPLITS / parallelism, result.get(2).size()); + + mqAll.clear(); + for (int i = NUM_SPLITS; i < 8 + NUM_SPLITS; i++) { + mqAll.add( + new RocketMQSourceSplit( + PREFIX_TOPIC + (i + 1), + BROKER_NAME, + i, + 0, + SPLIT_SIZE[i % SPLIT_SIZE.length])); + } + Map<Integer, Set<RocketMQSourceSplit>> result1 = + allocateStrategy.allocate(mqAll, parallelism, NUM_SPLITS); + + mqAll.clear(); + for (int i = 8 + NUM_SPLITS; i < 8 + 7 + NUM_SPLITS; i++) { + mqAll.add( + new RocketMQSourceSplit( + PREFIX_TOPIC + (i + 1), + BROKER_NAME, + i, + 0, + SPLIT_SIZE[i % SPLIT_SIZE.length])); + } + Map<Integer, Set<RocketMQSourceSplit>> result2 = + allocateStrategy.allocate(mqAll, parallelism, NUM_SPLITS + 8); + + result1.forEach((k, v) -> result.computeIfAbsent(k, r -> new HashSet<>()).addAll(v)); + result2.forEach((k, v) -> result.computeIfAbsent(k, r -> new HashSet<>()).addAll(v)); + + // No matter how many times it's assigned, it's always equal + assertEquals((NUM_SPLITS + 8 + 7) / parallelism, result.get(0).size()); + assertEquals((NUM_SPLITS + 8 + 7) / parallelism, result.get(1).size()); + assertEquals((NUM_SPLITS + 8 + 7) / parallelism, result.get(2).size()); + } +} diff --git a/src/test/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializerTest.java b/src/test/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializerTest.java index f1fa5af..432994e 100644 --- a/src/test/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializerTest.java +++ b/src/test/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializerTest.java @@ -36,4 +36,56 @@ public class RocketMQPartitionSplitSerializerTest { serializer.deserialize(serializer.getVersion(), serializer.serialize(expected)); assertEquals(expected, actual); } + + @Test + public void testSerializeAndDeserialize() throws IOException { + RocketMQPartitionSplitSerializer serializer = new RocketMQPartitionSplitSerializer(); + RocketMQSourceSplit originalSplit = + new RocketMQSourceSplit("testTopic", "testBroker", 0, 100L, 200L, false); + + byte[] serialized = serializer.serialize(originalSplit); + RocketMQSourceSplit deserializedSplit = + serializer.deserialize(serializer.getVersion(), serialized); + + assertEquals(originalSplit.getTopic(), deserializedSplit.getTopic()); + assertEquals(originalSplit.getBrokerName(), deserializedSplit.getBrokerName()); + assertEquals(originalSplit.getQueueId(), deserializedSplit.getQueueId()); + assertEquals(originalSplit.getStartingOffset(), deserializedSplit.getStartingOffset()); + assertEquals(originalSplit.getStoppingOffset(), deserializedSplit.getStoppingOffset()); + assertEquals(originalSplit.getIsIncrease(), deserializedSplit.getIsIncrease()); + } + + @Test + public void testDeserializeWithOldVersion() throws IOException { + RocketMQPartitionSplitSerializer serializer = new RocketMQPartitionSplitSerializer(); + RocketMQSourceSplit originalSplit = + new RocketMQSourceSplit("testTopic", "testBroker", 0, 100L, 200L, false); + + byte[] serialized = serializer.serialize(originalSplit); + RocketMQSourceSplit deserializedSplit = serializer.deserialize(1, serialized); + + assertEquals(originalSplit.getTopic(), deserializedSplit.getTopic()); + assertEquals(originalSplit.getBrokerName(), deserializedSplit.getBrokerName()); + assertEquals(originalSplit.getQueueId(), deserializedSplit.getQueueId()); + assertEquals(originalSplit.getStartingOffset(), deserializedSplit.getStartingOffset()); + assertEquals(originalSplit.getStoppingOffset(), deserializedSplit.getStoppingOffset()); + assertEquals(originalSplit.getIsIncrease(), deserializedSplit.getIsIncrease()); + } + + @Test + public void testDeserializeWithOldVersion1() throws IOException { + RocketMQPartitionSplitSerializer serializer = new RocketMQPartitionSplitSerializer(); + RocketMQSourceSplit originalSplit = + new RocketMQSourceSplit("testTopic", "testBroker", 0, 100L, 200L, false); + + byte[] serialized = serializer.serialize(originalSplit); + RocketMQSourceSplit deserializedSplit = serializer.deserialize(0, serialized); + + assertEquals(originalSplit.getTopic(), deserializedSplit.getTopic()); + assertEquals(originalSplit.getBrokerName(), deserializedSplit.getBrokerName()); + assertEquals(originalSplit.getQueueId(), deserializedSplit.getQueueId()); + assertEquals(originalSplit.getStartingOffset(), deserializedSplit.getStartingOffset()); + assertEquals(originalSplit.getStoppingOffset(), deserializedSplit.getStoppingOffset()); + assertEquals(true, deserializedSplit.getIsIncrease()); + } }