This is an automated email from the ASF dual-hosted git repository. corgy pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 68b0504da9 [Feature][Connectors-V2] Add end_timestamp for timstamp
start mode (#9318)
68b0504da9 is described below
commit 68b0504da92230db5a9aac3ea046b4cffb2d539e
Author: WenDing-Y <[email protected]>
AuthorDate: Tue May 20 09:37:28 2025 +0800
[Feature][Connectors-V2] Add end_timestamp for timstamp start mode (#9318)
---
docs/en/connector-v2/source/Kafka.md | 1 +
docs/zh/connector-v2/source/Kafka.md | 49 +++++++-------
.../seatunnel/kafka/config/KafkaSourceOptions.java | 7 ++
.../serialize/DefaultSeaTunnelRowSerializer.java | 15 +++++
.../seatunnel/kafka/source/ConsumerMetadata.java | 1 +
.../seatunnel/kafka/source/KafkaSourceConfig.java | 14 ++++
.../kafka/source/KafkaSourceSplitEnumerator.java | 16 +++++
.../seatunnel/e2e/connector/kafka/KafkaIT.java | 38 +++++++++++
.../kafka/kafkasource_endTimestamp_to_console.conf | 74 ++++++++++++++++++++++
9 files changed, 191 insertions(+), 24 deletions(-)
diff --git a/docs/en/connector-v2/source/Kafka.md
b/docs/en/connector-v2/source/Kafka.md
index 58a56ac4d1..15b25d60d4 100644
--- a/docs/en/connector-v2/source/Kafka.md
+++ b/docs/en/connector-v2/source/Kafka.md
@@ -52,6 +52,7 @@ They can be downloaded via install-plugin.sh or from the
Maven central repositor
| start_mode |
StartMode[earliest],[group_offsets],[latest],[specific_offsets],[timestamp] |
No | group_offsets | The initial consumption pattern of
consumers.
[...]
| start_mode.offsets | Config
| No | - | The
offset required for consumption mode to be specific_offsets.
[...]
| start_mode.timestamp | Long
| No | - | The
time required for consumption mode to be "timestamp".
[...]
+| start_mode.end_timestamp | Long
| No | - |
The end time required for consumption mode to be "timestamp" in batch mode
| partition-discovery.interval-millis | Long
| No | -1 | The
interval for dynamically discovering topics and partitions.
[...]
| common-options |
| No | - |
Source plugin common parameters, please refer to [Source Common
Options](../source-common-options.md) for details
[...]
| protobuf_message_name | String
| No | - |
Effective when the format is set to protobuf, specifies the Message name
[...]
diff --git a/docs/zh/connector-v2/source/Kafka.md
b/docs/zh/connector-v2/source/Kafka.md
index fe4a9d0d1c..6243898bc1 100644
--- a/docs/zh/connector-v2/source/Kafka.md
+++ b/docs/zh/connector-v2/source/Kafka.md
@@ -34,30 +34,31 @@ import ChangeLog from '../changelog/connector-kafka.md';
## 源选项
-| 名称 | 类型 |
是否必填 | 默认值 | 描述
|
-|-------------------------------------|-------------------------------------|------|--------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| topic | String |
是 | - | 使用表作为数据源时要读取数据的主题名称。它也支持通过逗号分隔的多个主题列表,例如
'topic-1,topic-2'。
|
-| table_list | Map |
否 | - | 主题列表配置,你可以同时配置一个 `table_list` 和一个 `topic`。
|
-| bootstrap.servers | String |
是 | - | 逗号分隔的 Kafka brokers 列表。
|
-| pattern | Boolean |
否 | false | 如果 `pattern` 设置为 `true`,则会使用指定的正则表达式匹配并订阅主题。
|
-| consumer.group | String |
否 | SeaTunnel-Consumer-Group | `Kafka 消费者组 ID`,用于区分不同的消费者组。
|
-| commit_on_checkpoint | Boolean |
否 | true | 如果为 true,消费者的偏移量将会定期在后台提交。
|
-| poll.timeout | Long |
否 | 10000 | kafka主动拉取时间间隔(毫秒)。
|
-| kafka.config | Map |
否 | - | 除了上述必要参数外,用户还可以指定多个非强制的消费者客户端参数,覆盖 [Kafka
官方文档](https://kafka.apache.org/documentation.html#consumerconfigs) 中指定的所有消费者参数。
|
-| schema | Config |
否 | - | 数据结构,包括字段名称和字段类型。
|
-| format | String |
否 | json | 数据格式。默认格式为 json。可选格式包括 text, canal_json,
debezium_json, ogg_json, maxwell_json, avro , protobuf和native。默认字段分隔符为 ",
"。如果自定义分隔符,添加 "field_delimiter" 选项。如果使用 canal 格式,请参考
[canal-json](../formats/canal-json.md) 了解详细信息。如果使用 debezium 格式,请参考
[debezium-json](../formats/debezium-json.md)。一些Format的详细信息请参考
[formats](../formats) |
-| format_error_handle_way | String |
否 | fail | 数据格式错误的处理方式。默认值为 fail,可选值为 fail 和 skip。当选择
fail 时,数据格式错误将阻塞并抛出异常。当选择 skip 时,数据格式错误将跳过此行数据。
|
-| debezium_record_table_filter | Config |
否 | - | 用于过滤 debezium 格式的数据,仅当格式设置为 `debezium_json`
时使用。请参阅下面的 `debezium_record_table_filter`
|
-| field_delimiter | String |
否 | , | 自定义数据格式的字段分隔符。
|
-| start_mode | StartMode[earliest],[group_offsets] |
否 | group_offsets | 消费者的初始消费模式。
|
-| start_mode.offsets | Config |
否 | - | 用于 specific_offsets 消费模式的偏移量。
|
-| start_mode.timestamp | Long |
否 | - | 用于 "timestamp" 消费模式的时间。
|
-| partition-discovery.interval-millis | Long |
否 | -1 | 动态发现主题和分区的间隔时间。
|
-| common-options | |
否 | - | 源插件的常见参数,详情请参考 [Source Common
Options](../source-common-options.md)。
|
-| protobuf_message_name | String |
否 | - | 当格式设置为 protobuf 时有效,指定消息名称。
|
-| protobuf_schema | String |
否 | - | 当格式设置为 protobuf 时有效,指定 Schema 定义。
|
-| reader_cache_queue_size | Integer |
否 | 1024 |
Reader分片缓存队列,用于缓存分片对应的数据。占用大小取决于每个reader得到的分片量,而不是每个分片的数据量。
|
-| is_native | Boolean |
No | false | 支持保留record的源信息。
|
+| 名称 | 类型 |
是否必填 | 默认值 | 描述
|
+|-------------------------------------|-------------------------------------|------|------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| topic | String |
是 | - | 使用表作为数据源时要读取数据的主题名称。它也支持通过逗号分隔的多个主题列表,例如
'topic-1,topic-2'。
|
+| table_list | Map |
否 | - | 主题列表配置,你可以同时配置一个 `table_list` 和一个
`topic`。
|
+| bootstrap.servers | String |
是 | - | 逗号分隔的 Kafka brokers 列表。
|
+| pattern | Boolean |
否 | false | 如果 `pattern` 设置为
`true`,则会使用指定的正则表达式匹配并订阅主题。
|
+| consumer.group | String |
否 | SeaTunnel-Consumer-Group | `Kafka 消费者组 ID`,用于区分不同的消费者组。
|
+| commit_on_checkpoint | Boolean |
否 | true | 如果为 true,消费者的偏移量将会定期在后台提交。
|
+| poll.timeout | Long |
否 | 10000 | kafka主动拉取时间间隔(毫秒)。
|
+| kafka.config | Map |
否 | - | 除了上述必要参数外,用户还可以指定多个非强制的消费者客户端参数,覆盖 [Kafka
官方文档](https://kafka.apache.org/documentation.html#consumerconfigs) 中指定的所有消费者参数。
|
+| schema | Config |
否 | - | 数据结构,包括字段名称和字段类型。
|
+| format | String |
否 | json | 数据格式。默认格式为 json。可选格式包括 text, canal_json,
debezium_json, ogg_json, maxwell_json, avro , protobuf和native。默认字段分隔符为 ",
"。如果自定义分隔符,添加 "field_delimiter" 选项。如果使用 canal 格式,请参考
[canal-json](../formats/canal-json.md) 了解详细信息。如果使用 debezium 格式,请参考
[debezium-json](../formats/debezium-json.md)。一些Format的详细信息请参考
[formats](../formats) |
+| format_error_handle_way | String |
否 | fail | 数据格式错误的处理方式。默认值为 fail,可选值为 fail 和
skip。当选择 fail 时,数据格式错误将阻塞并抛出异常。当选择 skip 时,数据格式错误将跳过此行数据。
|
+| debezium_record_table_filter | Config |
否 | - | 用于过滤 debezium 格式的数据,仅当格式设置为
`debezium_json` 时使用。请参阅下面的 `debezium_record_table_filter`
|
+| field_delimiter | String |
否 | , | 自定义数据格式的字段分隔符。
|
+| start_mode | StartMode[earliest],[group_offsets] |
否 | group_offsets | 消费者的初始消费模式。
|
+| start_mode.offsets | Config |
否 | - | 用于 specific_offsets 消费模式的偏移量。
|
+| start_mode.timestamp | Long |
否 | - | 用于 "timestamp" 消费模式的时间。
|
+| start_mode.end_timestamp | Long |
否 | - | 用于 "timestamp" 消费模式的结束时间,只支持批模式
|
+| partition-discovery.interval-millis | Long |
否 | -1 | 动态发现主题和分区的间隔时间。
|
+| common-options | |
否 | - | 源插件的常见参数,详情请参考 [Source Common
Options](../source-common-options.md)。
|
+| protobuf_message_name | String |
否 | - | 当格式设置为 protobuf 时有效,指定消息名称。
|
+| protobuf_schema | String |
否 | - | 当格式设置为 protobuf 时有效,指定 Schema 定义。
|
+| reader_cache_queue_size | Integer |
否 | 1024 |
Reader分片缓存队列,用于缓存分片对应的数据。占用大小取决于每个reader得到的分片量,而不是每个分片的数据量。
|
+| is_native | Boolean |
No | false | 支持保留record的源信息。
|
### debezium_record_table_filter
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSourceOptions.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSourceOptions.java
index 35b9573beb..51b80e81ac 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSourceOptions.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSourceOptions.java
@@ -109,4 +109,11 @@ public class KafkaSourceOptions extends KafkaBaseOptions {
"The processing method of data format error. The
default value is fail, and the optional value is (fail, skip). "
+ "When fail is selected, data format
error will block and an exception will be thrown. "
+ "When skip is selected, data format
error will skip this line data.");
+
+ public static final Option<Long> START_MODE_END_TIMESTAMP =
+ Options.key("start_mode.end_timestamp")
+ .longType()
+ .noDefaultValue()
+ .withDescription(
+ "The time required for consumption mode to be
timestamp.The endTimestamp configuration specifies the end timestamp of the
messages and is only applicable in batch mode");
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
index 8eb951988f..70d08e5221 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
@@ -92,6 +92,21 @@ public class DefaultSeaTunnelRowSerializer implements
SeaTunnelRowSerializer {
headersExtractor(rowType));
}
+ public static DefaultSeaTunnelRowSerializer create(
+ String topic,
+ MessageFormat format,
+ SeaTunnelRowType rowType,
+ String delimiter,
+ ReadonlyConfig pluginConfig) {
+ return new DefaultSeaTunnelRowSerializer(
+ topicExtractor(topic, rowType, format),
+ partitionExtractor(null),
+ timestampExtractor(rowType),
+ keyExtractor(null, rowType, format, null, null),
+ valueExtractor(rowType, format, delimiter, pluginConfig),
+ headersExtractor());
+ }
+
public static DefaultSeaTunnelRowSerializer create(
String topic,
SeaTunnelRowType rowType,
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/ConsumerMetadata.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/ConsumerMetadata.java
index c129fbdc93..91591253ce 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/ConsumerMetadata.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/ConsumerMetadata.java
@@ -42,4 +42,5 @@ public class ConsumerMetadata implements Serializable {
private Long startOffsetsTimestamp;
private DeserializationSchema<SeaTunnelRow> deserializationSchema;
private CatalogTable catalogTable;
+ private Long endOffsetsTimestamp;
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
index ddc67ebe17..f601b9bcf7 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
@@ -65,6 +65,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
@@ -92,6 +93,7 @@ import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSource
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.PROTOBUF_SCHEMA;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.READER_CACHE_QUEUE_SIZE;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.START_MODE;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.START_MODE_END_TIMESTAMP;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.START_MODE_OFFSETS;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.START_MODE_TIMESTAMP;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.TOPIC;
@@ -187,6 +189,18 @@ public class KafkaSourceConfig implements Serializable {
}
consumerMetadata.setStartOffsetsTimestamp(
startOffsetsTimestamp);
+ if (Objects.nonNull(
+
readonlyConfig.get(START_MODE_END_TIMESTAMP))) {
+ long endOffsetsTimestamp =
+
readonlyConfig.get(START_MODE_END_TIMESTAMP);
+ if (endOffsetsTimestamp < 0
+ || endOffsetsTimestamp >
currentTimestamp) {
+ throw new IllegalArgumentException(
+ "start_mode.endTimestamp
The value is smaller than 0 or smaller than the current time");
+ }
+
consumerMetadata.setEndOffsetsTimestamp(
+ endOffsetsTimestamp);
+ }
break;
case SPECIFIC_OFFSETS:
// Key is topic-partition, value is offset
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
index ce802412f9..686e0b71f3 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
@@ -155,6 +155,7 @@ public class KafkaSourceSplitEnumerator
private void setPartitionStartOffset() throws ExecutionException,
InterruptedException {
Set<TopicPartition> pendingTopicPartitions = pendingSplit.keySet();
Map<TopicPartition, Long> topicPartitionOffsets = new HashMap<>();
+ Map<TopicPartition, Long> topicPartitionEndOffsets = new HashMap<>();
// Set kafka TopicPartition based on the topicPath granularity
Map<TablePath, Set<TopicPartition>> tablePathPartitionMap =
pendingTopicPartitions.stream()
@@ -182,6 +183,13 @@ public class KafkaSourceSplitEnumerator
listOffsets(
topicPartitions,
OffsetSpec.forTimestamp(metadata.getStartOffsetsTimestamp())));
+ if (Objects.nonNull(metadata.getEndOffsetsTimestamp())) {
+ topicPartitionEndOffsets.putAll(
+ listOffsets(
+ topicPartitions,
+ OffsetSpec.forTimestamp(
+
metadata.getEndOffsetsTimestamp())));
+ }
break;
case SPECIFIC_OFFSETS:
topicPartitionOffsets.putAll(metadata.getSpecificStartOffsets());
@@ -197,6 +205,14 @@ public class KafkaSourceSplitEnumerator
pendingSplit.get(key).setStartOffset(value);
}
});
+ if (!isStreamingMode && !topicPartitionEndOffsets.isEmpty()) {
+ topicPartitionEndOffsets.forEach(
+ (key, value) -> {
+ if (pendingSplit.containsKey(key)) {
+ pendingSplit.get(key).setEndOffset(value);
+ }
+ });
+ }
}
@Override
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index 82aa3f85af..4a4352e3fb 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -409,6 +409,22 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
testKafkaTimestampToConsole(container);
}
+ @TestTemplate
+ public void testSourceKafkaWithEndTimestamp(TestContainer container)
+ throws IOException, InterruptedException {
+ DefaultSeaTunnelRowSerializer serializer =
+ DefaultSeaTunnelRowSerializer.create(
+ "test_topic_source",
+ DEFAULT_FORMAT,
+ new SeaTunnelRowType(
+ new String[] {"id", "timestamp"},
+ new SeaTunnelDataType[] {BasicType.LONG_TYPE,
BasicType.LONG_TYPE}),
+ "",
+ null);
+ generateWithTimestampTestData(serializer::serializeRow, 0, 100,
1738395840000L);
+ testKafkaWithEndTimestampToConsole(container);
+ }
+
@TestTemplate
public void testSourceKafkaStartConfig(TestContainer container)
throws IOException, InterruptedException {
@@ -1084,6 +1100,13 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
}
+ public void testKafkaWithEndTimestampToConsole(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+
container.executeJob("/kafka/kafkasource_endTimestamp_to_console.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ }
+
private AdminClient createKafkaAdmin() {
Properties props = new Properties();
String bootstrapServers = kafkaContainer.getBootstrapServers();
@@ -1163,6 +1186,21 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
producer.flush();
}
+ private void generateWithTimestampTestData(
+ ProducerRecordConverter converter, int start, int end, long
startTimestamp) {
+ try {
+ for (int i = start; i < end; i++) {
+ SeaTunnelRow row =
+ new SeaTunnelRow(new Object[] {Long.valueOf(i),
startTimestamp + i * 1000});
+ ProducerRecord<byte[], byte[]> producerRecord =
converter.convert(row);
+ producer.send(producerRecord).get();
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ producer.flush();
+ }
+
private void generateNativeTestData(String topic, int start, int end) {
try {
for (int i = start; i < end; i++) {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_endTimestamp_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_endTimestamp_to_console.conf
new file mode 100644
index 0000000000..8f2edca236
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_endTimestamp_to_console.conf
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Kafka {
+ bootstrap.servers = "kafkaCluster:9092"
+ topic = "test_topic_source"
+ plugin_output = "kafka_table"
+ # The default format is json, which is optional
+ format = json
+ start_mode = timestamp
+ schema = {
+ fields {
+ id = bigint
+ }
+ }
+ start_mode.timestamp = 1738395840000
+ start_mode.end_timestamp= 1738395900000
+ }
+
+ # If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
+ # please go to
https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource
+}
+
+transform {
+}
+
+sink {
+ Assert {
+ plugin_input = "kafka_table"
+ rules =
+ {
+ field_rules = [
+ {
+ field_name = id
+ field_type = bigint
+ field_value = [
+
+ {
+ rule_type = MIN
+ rule_value = 0
+ },
+ {
+ rule_type = MAX
+ rule_value = 60
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
