This is an automated email from the ASF dual-hosted git repository. fanjia pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new a0eeeb9b62 [Fix][Kafka] Fix in kafka streaming mode can not read incremental data (#7871) a0eeeb9b62 is described below commit a0eeeb9b6234ce842f25395e6f5524eef53fb1f5 Author: Carl-Zhou-CN <1058249...@qq.com> AuthorDate: Sat Nov 16 11:17:37 2024 +0800 [Fix][Kafka] Fix in kafka streaming mode can not read incremental data (#7871) --- docs/en/connector-v2/source/kafka.md | 1 + .../seatunnel/kafka/source/KafkaSource.java | 11 +- .../kafka/source/KafkaSourceSplitEnumerator.java | 34 ++++- .../admin/KafkaSourceSplitEnumeratorTest.java | 156 +++++++++++++++++++++ .../source/KafkaSourceSplitEnumeratorTest.java | 74 ---------- 5 files changed, 194 insertions(+), 82 deletions(-) diff --git a/docs/en/connector-v2/source/kafka.md b/docs/en/connector-v2/source/kafka.md index 90c183c2c1..c0ed66186b 100644 --- a/docs/en/connector-v2/source/kafka.md +++ b/docs/en/connector-v2/source/kafka.md @@ -59,6 +59,7 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor ### Simple > This example reads the data of kafka's topic_1, topic_2, topic_3 and prints > it to the client.And if you have not yet installed and deployed SeaTunnel, > you need to follow the instructions in Install SeaTunnel to install and > deploy SeaTunnel. And if you have not yet installed and deployed SeaTunnel, > you need to follow the instructions in [Install > SeaTunnel](../../start-v2/locally/deployment.md) to install and deploy > SeaTunnel. And then follow the instructions in [Quick Start With SeaTunn > [...] +> In batch mode, during the enumerator sharding process, it will fetch the latest offset for each partition and use it as the stopping point. ```hocon # Defining the runtime environment diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java index 5688fde5b6..0ff99807f2 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java @@ -104,7 +104,11 @@ public class KafkaSource @Override public SourceSplitEnumerator<KafkaSourceSplit, KafkaSourceState> createEnumerator( SourceSplitEnumerator.Context<KafkaSourceSplit> enumeratorContext) { - return new KafkaSourceSplitEnumerator(kafkaSourceConfig, enumeratorContext, null); + return new KafkaSourceSplitEnumerator( + kafkaSourceConfig, + enumeratorContext, + null, + getBoundedness() == Boundedness.UNBOUNDED); } @Override @@ -112,7 +116,10 @@ public class KafkaSource SourceSplitEnumerator.Context<KafkaSourceSplit> enumeratorContext, KafkaSourceState checkpointState) { return new KafkaSourceSplitEnumerator( - kafkaSourceConfig, enumeratorContext, checkpointState); + kafkaSourceConfig, + enumeratorContext, + checkpointState, + getBoundedness() == Boundedness.UNBOUNDED); } @Override diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java index 06ce4565c3..6d6c1ca96f 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java @@ -70,10 +70,13 @@ public class KafkaSourceSplitEnumerator private final Map<String, TablePath> topicMappingTablePathMap = new HashMap<>(); + private boolean isStreamingMode; + KafkaSourceSplitEnumerator( KafkaSourceConfig kafkaSourceConfig, Context<KafkaSourceSplit> context, - KafkaSourceState sourceState) { + KafkaSourceState sourceState, + boolean isStreamingMode) { this.kafkaSourceConfig = kafkaSourceConfig; this.tablePathMetadataMap = kafkaSourceConfig.getMapMetadata(); this.context = context; @@ -81,10 +84,11 @@ public class KafkaSourceSplitEnumerator this.pendingSplit = new HashMap<>(); this.adminClient = initAdminClient(this.kafkaSourceConfig.getProperties()); this.discoveryIntervalMillis = kafkaSourceConfig.getDiscoveryIntervalMillis(); + this.isStreamingMode = isStreamingMode; } @VisibleForTesting - protected KafkaSourceSplitEnumerator( + public KafkaSourceSplitEnumerator( AdminClient adminClient, Map<TopicPartition, KafkaSourceSplit> pendingSplit, Map<TopicPartition, KafkaSourceSplit> assignedSplit) { @@ -97,6 +101,16 @@ public class KafkaSourceSplitEnumerator this.assignedSplit = assignedSplit; } + @VisibleForTesting + public KafkaSourceSplitEnumerator( + AdminClient adminClient, + Map<TopicPartition, KafkaSourceSplit> pendingSplit, + Map<TopicPartition, KafkaSourceSplit> assignedSplit, + boolean isStreamingMode) { + this(adminClient, pendingSplit, assignedSplit); + this.isStreamingMode = isStreamingMode; + } + @Override public void open() { if (discoveryIntervalMillis > 0) { @@ -204,7 +218,7 @@ public class KafkaSourceSplitEnumerator private Map<TopicPartition, ? extends KafkaSourceSplit> convertToNextSplit( List<KafkaSourceSplit> splits) { try { - Map<TopicPartition, Long> listOffsets = + Map<TopicPartition, Long> latestOffsets = listOffsets( splits.stream() .map(KafkaSourceSplit::getTopicPartition) @@ -214,7 +228,10 @@ public class KafkaSourceSplitEnumerator splits.forEach( split -> { split.setStartOffset(split.getEndOffset() + 1); - split.setEndOffset(listOffsets.get(split.getTopicPartition())); + split.setEndOffset( + isStreamingMode + ? Long.MAX_VALUE + : latestOffsets.get(split.getTopicPartition())); }); return splits.stream() .collect(Collectors.toMap(KafkaSourceSplit::getTopicPartition, split -> split)); @@ -305,7 +322,10 @@ public class KafkaSourceSplitEnumerator // Obtain the corresponding topic TablePath from kafka topic TablePath tablePath = topicMappingTablePathMap.get(partition.topic()); KafkaSourceSplit split = new KafkaSourceSplit(tablePath, partition); - split.setEndOffset(latestOffsets.get(split.getTopicPartition())); + split.setEndOffset( + isStreamingMode + ? Long.MAX_VALUE + : latestOffsets.get(partition)); return split; }) .collect(Collectors.toSet()); @@ -344,6 +364,7 @@ public class KafkaSourceSplitEnumerator private Map<TopicPartition, Long> listOffsets( Collection<TopicPartition> partitions, OffsetSpec offsetSpec) throws ExecutionException, InterruptedException { + Map<TopicPartition, OffsetSpec> topicPartitionOffsets = partitions.stream() .collect(Collectors.toMap(partition -> partition, __ -> offsetSpec)); @@ -391,7 +412,8 @@ public class KafkaSourceSplitEnumerator assignSplit(); } - private void fetchPendingPartitionSplit() throws ExecutionException, InterruptedException { + @VisibleForTesting + public void fetchPendingPartitionSplit() throws ExecutionException, InterruptedException { getTopicInfo() .forEach( split -> { diff --git a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/kafka/clients/admin/KafkaSourceSplitEnumeratorTest.java b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/kafka/clients/admin/KafkaSourceSplitEnumeratorTest.java new file mode 100644 index 0000000000..00e059ecfe --- /dev/null +++ b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/kafka/clients/admin/KafkaSourceSplitEnumeratorTest.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplit; +import org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplitEnumerator; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutionException; + +class KafkaSourceSplitEnumeratorTest { + + AdminClient adminClient = Mockito.mock(KafkaAdminClient.class); + // prepare + TopicPartition partition = new TopicPartition("test", 0); + + @BeforeEach + void init() { + + Mockito.when(adminClient.listOffsets(Mockito.any(java.util.Map.class))) + .thenReturn( + new ListOffsetsResult( + new HashMap< + TopicPartition, + KafkaFuture<ListOffsetsResult.ListOffsetsResultInfo>>() { + { + put( + partition, + KafkaFuture.completedFuture( + new ListOffsetsResult.ListOffsetsResultInfo( + 0, 0, Optional.of(0)))); + } + })); + Mockito.when(adminClient.describeTopics(Mockito.any(java.util.Collection.class))) + .thenReturn( + DescribeTopicsResult.ofTopicNames( + new HashMap<String, KafkaFuture<TopicDescription>>() { + { + put( + partition.topic(), + KafkaFuture.completedFuture( + new TopicDescription( + partition.topic(), + false, + Collections.singletonList( + new TopicPartitionInfo( + 0, + null, + Collections + .emptyList(), + Collections + .emptyList()))))); + } + })); + } + + @Test + void addSplitsBack() { + // test + Map<TopicPartition, KafkaSourceSplit> assignedSplit = + new HashMap<TopicPartition, KafkaSourceSplit>() { + { + put(partition, new KafkaSourceSplit(null, partition)); + } + }; + Map<TopicPartition, KafkaSourceSplit> pendingSplit = new HashMap<>(); + List<KafkaSourceSplit> splits = Arrays.asList(new KafkaSourceSplit(null, partition)); + KafkaSourceSplitEnumerator enumerator = + new KafkaSourceSplitEnumerator(adminClient, pendingSplit, assignedSplit); + enumerator.addSplitsBack(splits, 1); + Assertions.assertTrue(pendingSplit.size() == splits.size()); + Assertions.assertNull(assignedSplit.get(partition)); + Assertions.assertTrue(pendingSplit.get(partition).getEndOffset() == 0); + } + + @Test + void addStreamingSplitsBack() { + // test + Map<TopicPartition, KafkaSourceSplit> assignedSplit = + new HashMap<TopicPartition, KafkaSourceSplit>() { + { + put(partition, new KafkaSourceSplit(null, partition)); + } + }; + Map<TopicPartition, KafkaSourceSplit> pendingSplit = new HashMap<>(); + List<KafkaSourceSplit> splits = + Collections.singletonList(new KafkaSourceSplit(null, partition)); + KafkaSourceSplitEnumerator enumerator = + new KafkaSourceSplitEnumerator(adminClient, pendingSplit, assignedSplit, true); + enumerator.addSplitsBack(splits, 1); + Assertions.assertEquals(pendingSplit.size(), splits.size()); + Assertions.assertNull(assignedSplit.get(partition)); + Assertions.assertTrue(pendingSplit.get(partition).getEndOffset() == Long.MAX_VALUE); + } + + @Test + void addStreamingSplits() throws ExecutionException, InterruptedException { + // test + Map<TopicPartition, KafkaSourceSplit> assignedSplit = + new HashMap<TopicPartition, KafkaSourceSplit>(); + Map<TopicPartition, KafkaSourceSplit> pendingSplit = new HashMap<>(); + List<KafkaSourceSplit> splits = + Collections.singletonList(new KafkaSourceSplit(null, partition)); + KafkaSourceSplitEnumerator enumerator = + new KafkaSourceSplitEnumerator(adminClient, pendingSplit, assignedSplit, true); + enumerator.fetchPendingPartitionSplit(); + Assertions.assertEquals(pendingSplit.size(), splits.size()); + Assertions.assertNotNull(pendingSplit.get(partition)); + Assertions.assertTrue(pendingSplit.get(partition).getEndOffset() == Long.MAX_VALUE); + } + + @Test + void addplits() throws ExecutionException, InterruptedException { + // test + Map<TopicPartition, KafkaSourceSplit> assignedSplit = + new HashMap<TopicPartition, KafkaSourceSplit>(); + Map<TopicPartition, KafkaSourceSplit> pendingSplit = new HashMap<>(); + List<KafkaSourceSplit> splits = + Collections.singletonList(new KafkaSourceSplit(null, partition)); + KafkaSourceSplitEnumerator enumerator = + new KafkaSourceSplitEnumerator(adminClient, pendingSplit, assignedSplit, false); + enumerator.fetchPendingPartitionSplit(); + Assertions.assertEquals(pendingSplit.size(), splits.size()); + Assertions.assertNotNull(pendingSplit.get(partition)); + Assertions.assertTrue(pendingSplit.get(partition).getEndOffset() == 0); + } +} diff --git a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumeratorTest.java b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumeratorTest.java deleted file mode 100644 index 6a8de812d3..0000000000 --- a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumeratorTest.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.seatunnel.kafka.source; - -import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.admin.KafkaAdminClient; -import org.apache.kafka.clients.admin.ListOffsetsResult; -import org.apache.kafka.common.KafkaFuture; -import org.apache.kafka.common.TopicPartition; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.mockito.Mockito; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; - -class KafkaSourceSplitEnumeratorTest { - - @Test - void addSplitsBack() { - // prepare - TopicPartition partition = new TopicPartition("test", 0); - - AdminClient adminClient = Mockito.mock(KafkaAdminClient.class); - Mockito.when(adminClient.listOffsets(Mockito.any(java.util.Map.class))) - .thenReturn( - new ListOffsetsResult( - new HashMap< - TopicPartition, - KafkaFuture<ListOffsetsResult.ListOffsetsResultInfo>>() { - { - put( - partition, - KafkaFuture.completedFuture( - new ListOffsetsResult.ListOffsetsResultInfo( - 0, 0, Optional.of(0)))); - } - })); - - // test - Map<TopicPartition, KafkaSourceSplit> assignedSplit = - new HashMap<TopicPartition, KafkaSourceSplit>() { - { - put(partition, new KafkaSourceSplit(null, partition)); - } - }; - Map<TopicPartition, KafkaSourceSplit> pendingSplit = new HashMap<>(); - List<KafkaSourceSplit> splits = Arrays.asList(new KafkaSourceSplit(null, partition)); - KafkaSourceSplitEnumerator enumerator = - new KafkaSourceSplitEnumerator(adminClient, pendingSplit, assignedSplit); - enumerator.addSplitsBack(splits, 1); - Assertions.assertTrue(pendingSplit.size() == splits.size()); - Assertions.assertNull(assignedSplit.get(partition)); - } -}