This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 1784c01a35 [Fix][Connector-V2] Fix kafka batch mode can not read all message (#7135) 1784c01a35 is described below commit 1784c01a35e6fa23cad9a9f514a6b840835ea51a Author: Jia Fan <fanjiaemi...@qq.com> AuthorDate: Tue Jul 9 21:50:39 2024 +0800 [Fix][Connector-V2] Fix kafka batch mode can not read all message (#7135) --- .github/workflows/backend.yml | 6 +- docs/en/connector-v2/formats/avro.md | 2 +- docs/zh/connector-v2/formats/avro.md | 2 +- .../seatunnel/kafka/source/KafkaSourceReader.java | 31 ++++++---- .../seatunnel/e2e/connector/kafka/KafkaIT.java | 70 +++++++++++++++------- .../test/resources/avro/kafka_avro_to_assert.conf | 2 +- .../jsonFormatIT/kafka_source_json_to_console.conf | 2 +- ..._source_to_assert_with_max_poll_records_1.conf} | 34 +++++------ ...ce_format_error_handle_way_fail_to_console.conf | 1 - ...ce_format_error_handle_way_skip_to_console.conf | 1 - .../textFormatIT/kafka_source_text_to_console.conf | 2 +- ...ource_text_to_console_assert_catalog_table.conf | 2 +- 12 files changed, 91 insertions(+), 64 deletions(-) diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml index 9975d477da..b6094aff4d 100644 --- a/.github/workflows/backend.yml +++ b/.github/workflows/backend.yml @@ -1052,7 +1052,7 @@ jobs: kafka-connector-it: needs: [ changes, sanity-check ] - if: needs.changes.outputs.api == 'true' + if: needs.changes.outputs.api == 'true' || contains(needs.changes.outputs.it-modules, 'connector-kafka-e2e') runs-on: ${{ matrix.os }} strategy: matrix: @@ -1068,7 +1068,6 @@ jobs: distribution: 'temurin' cache: 'maven' - name: run kafka connector integration test - if: needs.changes.outputs.api == 'true' run: | ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-kafka-e2e -am -Pci env: @@ -1076,7 +1075,7 @@ jobs: rocketmq-connector-it: needs: [ changes, sanity-check ] - if: needs.changes.outputs.api == 'true' + if: needs.changes.outputs.api == 'true' || contains(needs.changes.outputs.it-modules, 'connector-rocketmq-e2e') runs-on: ${{ matrix.os }} strategy: matrix: @@ -1092,7 +1091,6 @@ jobs: distribution: 'temurin' cache: 'maven' - name: run rocket connector integration test - if: needs.changes.outputs.api == 'true' run: | ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-rocketmq-e2e -am -Pci env: diff --git a/docs/en/connector-v2/formats/avro.md b/docs/en/connector-v2/formats/avro.md index 638657b345..8fef411fb5 100644 --- a/docs/en/connector-v2/formats/avro.md +++ b/docs/en/connector-v2/formats/avro.md @@ -77,7 +77,7 @@ source { bootstrap.servers = "kafkaCluster:9092" topic = "test_avro_topic" result_table_name = "kafka_table" - kafka.auto.offset.reset = "earliest" + start_mode = "earliest" format = avro format_error_handle_way = skip schema = { diff --git a/docs/zh/connector-v2/formats/avro.md b/docs/zh/connector-v2/formats/avro.md index 4e19ea4b98..7176f4e507 100644 --- a/docs/zh/connector-v2/formats/avro.md +++ b/docs/zh/connector-v2/formats/avro.md @@ -77,7 +77,7 @@ source { bootstrap.servers = "kafkaCluster:9092" topic = "test_avro_topic" result_table_name = "kafka_table" - kafka.auto.offset.reset = "earliest" + start_mode = "earliest" format = avro format_error_handle_way = skip schema = { diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java index d136fabc40..02c2a9007e 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java @@ -45,6 +45,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; @@ -103,7 +104,7 @@ public class KafkaSourceReader implements SourceReader<SeaTunnelRow, KafkaSource return; } - while (pendingPartitionsQueue.size() != 0) { + while (!pendingPartitionsQueue.isEmpty()) { sourceSplits.add(pendingPartitionsQueue.poll()); } sourceSplits.forEach( @@ -120,9 +121,10 @@ public class KafkaSourceReader implements SourceReader<SeaTunnelRow, KafkaSource executorService.submit(thread); return thread; })); + List<KafkaSourceSplit> finishedSplits = new CopyOnWriteArrayList<>(); sourceSplits.forEach( sourceSplit -> { - CompletableFuture<Void> completableFuture = new CompletableFuture<>(); + CompletableFuture<Boolean> completableFuture = new CompletableFuture<>(); TablePath tablePath = sourceSplit.getTablePath(); DeserializationSchema<SeaTunnelRow> deserializationSchema = tablePathMetadataMap.get(tablePath).getDeserializationSchema(); @@ -148,9 +150,14 @@ public class KafkaSourceReader implements SourceReader<SeaTunnelRow, KafkaSource for (TopicPartition partition : partitions) { List<ConsumerRecord<byte[], byte[]>> recordList = records.records(partition); + if (Boundedness.BOUNDED.equals( + context.getBoundedness()) + && recordList.isEmpty()) { + completableFuture.complete(true); + return; + } for (ConsumerRecord<byte[], byte[]> record : recordList) { - try { if (deserializationSchema instanceof @@ -180,7 +187,8 @@ public class KafkaSourceReader implements SourceReader<SeaTunnelRow, KafkaSource && record.offset() >= sourceSplit .getEndOffset()) { - break; + completableFuture.complete(true); + return; } } long lastOffset = -1; @@ -199,18 +207,21 @@ public class KafkaSourceReader implements SourceReader<SeaTunnelRow, KafkaSource } catch (Exception e) { completableFuture.completeExceptionally(e); } - completableFuture.complete(null); + completableFuture.complete(false); }); - } catch (InterruptedException e) { + if (completableFuture.get()) { + finishedSplits.add(sourceSplit); + } + } catch (Exception e) { throw new KafkaConnectorException( KafkaConnectorErrorCode.CONSUME_DATA_FAILED, e); } - completableFuture.join(); }); - if (Boundedness.BOUNDED.equals(context.getBoundedness())) { - // signal to the source that we have reached the end of the data. - context.signalNoMoreElement(); + finishedSplits.forEach(sourceSplits::remove); + if (sourceSplits.isEmpty()) { + context.signalNoMoreElement(); + } } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java index 2f1c92048e..d4629851e7 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java @@ -212,6 +212,27 @@ public class KafkaIT extends TestSuiteBase implements TestResource { Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); } + @TestTemplate + public void testSourceKafkaToAssertWithMaxPollRecords1(TestContainer container) + throws IOException, InterruptedException { + TextSerializationSchema serializer = + TextSerializationSchema.builder() + .seaTunnelRowType(SEATUNNEL_ROW_TYPE) + .delimiter(",") + .build(); + generateTestData( + row -> + new ProducerRecord<>( + "test_topic_text_max_poll_records_1", + null, + serializer.serialize(row)), + 0, + 100); + Container.ExecResult execResult = + container.executeJob("/kafka/kafka_source_to_assert_with_max_poll_records_1.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + } + @TestTemplate public void testSourceKafkaTextToConsoleAssertCatalogTable(TestContainer container) throws IOException, InterruptedException { @@ -538,29 +559,34 @@ public class KafkaIT extends TestSuiteBase implements TestResource { } private void generateTestData(ProducerRecordConverter converter, int start, int end) { - for (int i = start; i < end; i++) { - SeaTunnelRow row = - new SeaTunnelRow( - new Object[] { - Long.valueOf(i), - Collections.singletonMap("key", Short.parseShort("1")), - new Byte[] {Byte.parseByte("1")}, - "string", - Boolean.FALSE, - Byte.parseByte("1"), - Short.parseShort("1"), - Integer.parseInt("1"), - Long.parseLong("1"), - Float.parseFloat("1.1"), - Double.parseDouble("1.1"), - BigDecimal.valueOf(11, 1), - "test".getBytes(), - LocalDate.of(2024, 1, 1), - LocalDateTime.of(2024, 1, 1, 12, 59, 23) - }); - ProducerRecord<byte[], byte[]> producerRecord = converter.convert(row); - producer.send(producerRecord); + try { + for (int i = start; i < end; i++) { + SeaTunnelRow row = + new SeaTunnelRow( + new Object[] { + Long.valueOf(i), + Collections.singletonMap("key", Short.parseShort("1")), + new Byte[] {Byte.parseByte("1")}, + "string", + Boolean.FALSE, + Byte.parseByte("1"), + Short.parseShort("1"), + Integer.parseInt("1"), + Long.parseLong("1"), + Float.parseFloat("1.1"), + Double.parseDouble("1.1"), + BigDecimal.valueOf(11, 1), + "test".getBytes(), + LocalDate.of(2024, 1, 1), + LocalDateTime.of(2024, 1, 1, 12, 59, 23) + }); + ProducerRecord<byte[], byte[]> producerRecord = converter.convert(row); + producer.send(producerRecord).get(); + } + } catch (Exception e) { + throw new RuntimeException(e); } + producer.flush(); } private static final SeaTunnelRowType SEATUNNEL_ROW_TYPE = diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_assert.conf index 31fe77a3e2..755a9a2b8d 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_assert.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_assert.conf @@ -32,7 +32,7 @@ source { bootstrap.servers = "kafkaCluster:9092" topic = "test_avro_topic" result_table_name = "kafka_table" - kafka.auto.offset.reset = "earliest" + start_mode = "earliest" format = avro format_error_handle_way = skip schema = { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/jsonFormatIT/kafka_source_json_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/jsonFormatIT/kafka_source_json_to_console.conf index f9a41e7987..3657390602 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/jsonFormatIT/kafka_source_json_to_console.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/jsonFormatIT/kafka_source_json_to_console.conf @@ -32,7 +32,7 @@ source { bootstrap.servers = "kafkaCluster:9092" topic = "test_topic_json" result_table_name = "kafka_table" - kafka.auto.offset.reset = "earliest" + start_mode = "earliest" format_error_handle_way = skip schema = { fields { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console_assert_catalog_table.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafka_source_to_assert_with_max_poll_records_1.conf similarity index 85% copy from seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console_assert_catalog_table.conf copy to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafka_source_to_assert_with_max_poll_records_1.conf index d7f875272b..787858e229 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console_assert_catalog_table.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafka_source_to_assert_with_max_poll_records_1.conf @@ -30,10 +30,13 @@ env { source { Kafka { bootstrap.servers = "kafkaCluster:9092" - topic = "test_topic_text" + topic = "test_topic_text_max_poll_records_1" result_table_name = "kafka_table" - kafka.auto.offset.reset = "earliest" + start_mode = "earliest" format_error_handle_way = fail + kafka.config = { + max.poll.records = 1 + } schema = { columns = [ { @@ -120,6 +123,9 @@ source { } sink { + console { + source_table_name = "kafka_table" + } Assert { source_table_name = "kafka_table" rules = @@ -143,24 +149,12 @@ sink { ] } ] - catalog_table_rule = { - primary_key_rule = { - primary_key_name = "primary key" - primary_key_columns = ["id"] - } - constraint_key_rule = [ - { - constraint_key_name = "unique_c_string" - constraint_key_type = UNIQUE_KEY - constraint_key_columns = [ - { - constraint_key_column_name = "c_string" - constraint_key_sort_type = ASC - } - ] - } - ] - } + row_rules = [ + { + rule_type = MIN_ROW + rule_value = 100 + } + ] } } } \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf index b6db50989a..d2a0f05354 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf @@ -37,7 +37,6 @@ source { result_table_name = "kafka_table" start_mode = "earliest" format_error_handle_way = fail - # kafka.auto.offset.reset = "earliest" schema = { fields { id = bigint diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf index 45b29d1915..88b6098b5e 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf @@ -37,7 +37,6 @@ source { result_table_name = "kafka_table" start_mode = "earliest" format_error_handle_way = skip - # kafka.auto.offset.reset = "earliest" schema = { fields { id = bigint diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console.conf index 36f01c0337..3ce077bd58 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console.conf @@ -32,7 +32,7 @@ source { bootstrap.servers = "kafkaCluster:9092" topic = "test_topic_text" result_table_name = "kafka_table" - kafka.auto.offset.reset = "earliest" + start_mode = "earliest" format_error_handle_way = fail schema = { fields { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console_assert_catalog_table.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console_assert_catalog_table.conf index d7f875272b..132829e324 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console_assert_catalog_table.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console_assert_catalog_table.conf @@ -32,7 +32,7 @@ source { bootstrap.servers = "kafkaCluster:9092" topic = "test_topic_text" result_table_name = "kafka_table" - kafka.auto.offset.reset = "earliest" + start_mode = "earliest" format_error_handle_way = fail schema = { columns = [