This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 7dc3fa8a13 [Fix][Connector kafka]Fix Kafka consumer stop fetching after TM node restarted (#7233) 7dc3fa8a13 is described below commit 7dc3fa8a13e77914798b133332a72831426cc935 Author: litiliu <38579068+liti...@users.noreply.github.com> AuthorDate: Mon Jul 22 11:41:33 2024 +0800 [Fix][Connector kafka]Fix Kafka consumer stop fetching after TM node restarted (#7233) --- .../kafka/source/KafkaSourceSplitEnumerator.java | 20 +++++- .../source/KafkaSourceSplitEnumeratorTest.java | 74 ++++++++++++++++++++++ 2 files changed, 93 insertions(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java index a7471ae086..f868eaed20 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java @@ -30,6 +30,7 @@ import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.TopicPartition; +import com.google.common.annotations.VisibleForTesting; import lombok.extern.slf4j.Slf4j; import java.io.IOException; @@ -82,6 +83,20 @@ public class KafkaSourceSplitEnumerator this.discoveryIntervalMillis = kafkaSourceConfig.getDiscoveryIntervalMillis(); } + @VisibleForTesting + protected KafkaSourceSplitEnumerator( + AdminClient adminClient, + Map<TopicPartition, KafkaSourceSplit> pendingSplit, + Map<TopicPartition, KafkaSourceSplit> assignedSplit) { + this.tablePathMetadataMap = new HashMap<>(); + this.context = null; + this.discoveryIntervalMillis = -1; + this.adminClient = adminClient; + this.kafkaSourceConfig = null; + this.pendingSplit = pendingSplit; + this.assignedSplit = assignedSplit; + } + @Override public void open() { if (discoveryIntervalMillis > 0) { @@ -180,7 +195,10 @@ public class KafkaSourceSplitEnumerator @Override public void addSplitsBack(List<KafkaSourceSplit> splits, int subtaskId) { if (!splits.isEmpty()) { - pendingSplit.putAll(convertToNextSplit(splits)); + Map<TopicPartition, ? extends KafkaSourceSplit> nextSplit = convertToNextSplit(splits); + // remove them from the assignedSplit, so we can reassign them + nextSplit.keySet().forEach(assignedSplit::remove); + pendingSplit.putAll(nextSplit); } } diff --git a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumeratorTest.java b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumeratorTest.java new file mode 100644 index 0000000000..6a8de812d3 --- /dev/null +++ b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumeratorTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.kafka.source; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.KafkaAdminClient; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +class KafkaSourceSplitEnumeratorTest { + + @Test + void addSplitsBack() { + // prepare + TopicPartition partition = new TopicPartition("test", 0); + + AdminClient adminClient = Mockito.mock(KafkaAdminClient.class); + Mockito.when(adminClient.listOffsets(Mockito.any(java.util.Map.class))) + .thenReturn( + new ListOffsetsResult( + new HashMap< + TopicPartition, + KafkaFuture<ListOffsetsResult.ListOffsetsResultInfo>>() { + { + put( + partition, + KafkaFuture.completedFuture( + new ListOffsetsResult.ListOffsetsResultInfo( + 0, 0, Optional.of(0)))); + } + })); + + // test + Map<TopicPartition, KafkaSourceSplit> assignedSplit = + new HashMap<TopicPartition, KafkaSourceSplit>() { + { + put(partition, new KafkaSourceSplit(null, partition)); + } + }; + Map<TopicPartition, KafkaSourceSplit> pendingSplit = new HashMap<>(); + List<KafkaSourceSplit> splits = Arrays.asList(new KafkaSourceSplit(null, partition)); + KafkaSourceSplitEnumerator enumerator = + new KafkaSourceSplitEnumerator(adminClient, pendingSplit, assignedSplit); + enumerator.addSplitsBack(splits, 1); + Assertions.assertTrue(pendingSplit.size() == splits.size()); + Assertions.assertNull(assignedSplit.get(partition)); + } +}