This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c92e14168f [Fix][E2E] Fixed e2e test cases of Kafka did not work as
expected (#9359)
c92e14168f is described below
commit c92e14168ff2650a98d6ddb200ad355fa4462098
Author: WenDing-Y <[email protected]>
AuthorDate: Wed May 28 10:42:54 2025 +0800
[Fix][E2E] Fixed e2e test cases of Kafka did not work as expected (#9359)
---
.../seatunnel/e2e/connector/kafka/KafkaIT.java | 70 ++++++++++++++++++----
.../kafka/kafkasource_earliest_to_console.conf | 12 +++-
.../kafka/kafkasource_endTimestamp_to_console.conf | 12 +++-
...ce_format_error_handle_way_skip_to_console.conf | 7 +++
.../kafka/kafkasource_group_offset_to_console.conf | 13 +++-
.../kafkasource_specific_offsets_to_console.conf | 12 +++-
.../kafka/kafkasource_timestamp_to_console.conf | 12 +++-
7 files changed, 123 insertions(+), 15 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index 4a4352e3fb..728c5ee8e0 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -51,7 +51,9 @@ import
org.apache.seatunnel.format.protobuf.ProtobufDeserializationSchema;
import org.apache.seatunnel.format.text.TextSerializationSchema;
import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
+import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -143,6 +145,27 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
.atMost(180, TimeUnit.SECONDS)
.untilAsserted(this::initKafkaProducer);
+ Properties adminProps = new Properties();
+ adminProps.put(
+ AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaContainer.getBootstrapServers());
+ // Set the retention time to -1 to read data older than 7 days.
+ try (AdminClient adminClient = AdminClient.create(adminProps)) {
+ NewTopic testTopicSource = new NewTopic("test_topic_source", 1,
(short) 1);
+ testTopicSource.configs(Collections.singletonMap("retention.ms",
"-1"));
+
+ NewTopic testTopicNativeSource = new
NewTopic("test_topic_native_source", 1, (short) 1);
+
testTopicNativeSource.configs(Collections.singletonMap("retention.ms", "-1"));
+
+ NewTopic testTopicSourceWithTimestamp =
+ new NewTopic("test_topic_source_timestamp", 1, (short) 1);
+
testTopicSourceWithTimestamp.configs(Collections.singletonMap("retention.ms",
"-1"));
+
+ List<NewTopic> topics =
+ Arrays.asList(
+ testTopicSource, testTopicNativeSource,
testTopicSourceWithTimestamp);
+ adminClient.createTopics(topics);
+ }
+
log.info("Write 100 records to topic test_topic_source");
DefaultSeaTunnelRowSerializer serializer =
DefaultSeaTunnelRowSerializer.create(
@@ -152,6 +175,18 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
DEFAULT_FIELD_DELIMITER,
null);
generateTestData(serializer::serializeRow, 0, 100);
+
+ DefaultSeaTunnelRowSerializer rowSerializer =
+ DefaultSeaTunnelRowSerializer.create(
+ "test_topic_source_timestamp",
+ DEFAULT_FORMAT,
+ new SeaTunnelRowType(
+ new String[] {"id", "timestamp"},
+ new SeaTunnelDataType[] {BasicType.LONG_TYPE,
BasicType.LONG_TYPE}),
+ "",
+ null);
+ generateWithTimestampTestData(rowSerializer::serializeRow, 0, 100,
1738395840000L);
+
String topicName = "test_topic_native_source";
generateNativeTestData("test_topic_native_source", 0, 100);
nativeData = getKafkaRecordData(topicName);
@@ -412,16 +447,7 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
@TestTemplate
public void testSourceKafkaWithEndTimestamp(TestContainer container)
throws IOException, InterruptedException {
- DefaultSeaTunnelRowSerializer serializer =
- DefaultSeaTunnelRowSerializer.create(
- "test_topic_source",
- DEFAULT_FORMAT,
- new SeaTunnelRowType(
- new String[] {"id", "timestamp"},
- new SeaTunnelDataType[] {BasicType.LONG_TYPE,
BasicType.LONG_TYPE}),
- "",
- null);
- generateWithTimestampTestData(serializer::serializeRow, 0, 100,
1738395840000L);
+
testKafkaWithEndTimestampToConsole(container);
}
@@ -435,10 +461,34 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
DEFAULT_FORMAT,
DEFAULT_FIELD_DELIMITER,
null);
+ generateTestData(row -> serializer.serializeRow(row), 0, 10);
+ commitOffset("test_topic_group", "SeaTunnel-Consumer-Group-Offset");
generateTestData(row -> serializer.serializeRow(row), 100, 150);
testKafkaGroupOffsetsToConsole(container);
}
+ public void commitOffset(String topic, String groupId) {
+ Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaContainer.getBootstrapServers());
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ props.put(
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ ByteArrayDeserializer.class.getName());
+ props.put(
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ ByteArrayDeserializer.class.getName());
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+ KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
+ consumer.subscribe(Collections.singletonList(topic));
+ try {
+ consumer.poll(Duration.ofSeconds(60));
+ consumer.commitSync();
+ } finally {
+ consumer.close();
+ }
+ }
+
@DisabledOnContainer(
value = {},
type = {EngineType.SPARK, EngineType.FLINK},
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
index fb3fae77fa..bd9fb2b3e4 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
@@ -69,6 +69,16 @@ sink {
]
}
]
+ row_rules = [
+ {
+ rule_type = MIN_ROW
+ rule_value = 100
+ },
+ {
+ rule_type = MAX_ROW
+ rule_value = 100
+ }
+ ]
}
}
-}
\ No newline at end of file
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_endTimestamp_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_endTimestamp_to_console.conf
index 8f2edca236..91646ea353 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_endTimestamp_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_endTimestamp_to_console.conf
@@ -26,7 +26,7 @@ env {
source {
Kafka {
bootstrap.servers = "kafkaCluster:9092"
- topic = "test_topic_source"
+ topic = "test_topic_source_timestamp"
plugin_output = "kafka_table"
# The default format is json, which is optional
format = json
@@ -69,6 +69,16 @@ sink {
]
}
]
+ row_rules = [
+ {
+ rule_type = MIN_ROW
+ rule_value = 60
+ },
+ {
+ rule_type = MAX_ROW
+ rule_value = 60
+ }
+ ]
}
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf
index 510dc27242..fc55de5733 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf
@@ -90,6 +90,13 @@ sink {
]
}
]
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 0
+ }
+ ]
+
}
}
}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
index 7934900039..653838c0f8 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
@@ -29,6 +29,7 @@ source {
Kafka {
bootstrap.servers = "kafkaCluster:9092"
topic = "test_topic_group"
+ consumer.group = "SeaTunnel-Consumer-Group-Offset"
plugin_output = "kafka_table"
# The default format is json, which is optional
format = json
@@ -69,6 +70,16 @@ sink {
]
}
]
+ row_rules = [
+ {
+ rule_type = MIN_ROW
+ rule_value = 50
+ },
+ {
+ rule_type = MAX_ROW
+ rule_value = 50
+ }
+ ]
}
}
-}
\ No newline at end of file
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
index 2e18fe6451..afbcd71436 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
@@ -71,6 +71,16 @@ sink {
]
}
]
+ row_rules = [
+ {
+ rule_type = MIN_ROW
+ rule_value = 50
+ },
+ {
+ rule_type = MAX_ROW
+ rule_value = 50
+ }
+ ]
}
}
-}
\ No newline at end of file
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
index 6356b13f0a..126a7e2a89 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
@@ -68,6 +68,16 @@ sink {
]
}
]
+ row_rules = [
+ {
+ rule_type = MIN_ROW
+ rule_value = 100
+ },
+ {
+ rule_type = MAX_ROW
+ rule_value = 100
+ }
+ ]
}
}
-}
\ No newline at end of file
+}