This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 68b0504da9 [Feature][Connectors-V2] Add end_timestamp for timstamp 
start mode (#9318)
68b0504da9 is described below

commit 68b0504da92230db5a9aac3ea046b4cffb2d539e
Author: WenDing-Y <[email protected]>
AuthorDate: Tue May 20 09:37:28 2025 +0800

    [Feature][Connectors-V2] Add end_timestamp for timstamp start mode (#9318)
---
 docs/en/connector-v2/source/Kafka.md               |  1 +
 docs/zh/connector-v2/source/Kafka.md               | 49 +++++++-------
 .../seatunnel/kafka/config/KafkaSourceOptions.java |  7 ++
 .../serialize/DefaultSeaTunnelRowSerializer.java   | 15 +++++
 .../seatunnel/kafka/source/ConsumerMetadata.java   |  1 +
 .../seatunnel/kafka/source/KafkaSourceConfig.java  | 14 ++++
 .../kafka/source/KafkaSourceSplitEnumerator.java   | 16 +++++
 .../seatunnel/e2e/connector/kafka/KafkaIT.java     | 38 +++++++++++
 .../kafka/kafkasource_endTimestamp_to_console.conf | 74 ++++++++++++++++++++++
 9 files changed, 191 insertions(+), 24 deletions(-)

diff --git a/docs/en/connector-v2/source/Kafka.md 
b/docs/en/connector-v2/source/Kafka.md
index 58a56ac4d1..15b25d60d4 100644
--- a/docs/en/connector-v2/source/Kafka.md
+++ b/docs/en/connector-v2/source/Kafka.md
@@ -52,6 +52,7 @@ They can be downloaded via install-plugin.sh or from the 
Maven central repositor
 | start_mode                          | 
StartMode[earliest],[group_offsets],[latest],[specific_offsets],[timestamp] | 
No       | group_offsets            | The initial consumption pattern of 
consumers.                                                                      
                                                                                
                                                                                
                                                              [...]
 | start_mode.offsets                  | Config                                 
                                    | No       | -                        | The 
offset required for consumption mode to be specific_offsets.                    
                                                                                
                                                                                
                                                                                
              [...]
 | start_mode.timestamp                | Long                                   
                                    | No       | -                        | The 
time required for consumption mode to be "timestamp".                           
                                                                                
                                                                                
                                                                                
              [...]
+| start_mode.end_timestamp             | Long                                  
                                     | No       | -                        | 
The end time required for consumption mode to be "timestamp" in batch mode
 | partition-discovery.interval-millis | Long                                   
                                    | No       | -1                       | The 
interval for dynamically discovering topics and partitions.                     
                                                                                
                                                                                
                                                                                
              [...]
 | common-options                      |                                        
                                    | No       | -                        | 
Source plugin common parameters, please refer to [Source Common 
Options](../source-common-options.md) for details                               
                                                                                
                                                                                
                                  [...]
 | protobuf_message_name               | String                                 
                                    | No       | -                        | 
Effective when the format is set to protobuf, specifies the Message name        
                                                                                
                                                                                
                                                                                
                  [...]
diff --git a/docs/zh/connector-v2/source/Kafka.md 
b/docs/zh/connector-v2/source/Kafka.md
index fe4a9d0d1c..6243898bc1 100644
--- a/docs/zh/connector-v2/source/Kafka.md
+++ b/docs/zh/connector-v2/source/Kafka.md
@@ -34,30 +34,31 @@ import ChangeLog from '../changelog/connector-kafka.md';
 
 ## 源选项
 
-| 名称                                  | 类型                                  | 
是否必填 | 默认值                      | 描述                                            
                                                                                
                                                                                
                                                                                
                                 |
-|-------------------------------------|-------------------------------------|------|--------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| topic                               | String                              | 
是    | -                        | 使用表作为数据源时要读取数据的主题名称。它也支持通过逗号分隔的多个主题列表,例如 
'topic-1,topic-2'。                                                              
                                                                                
                                                                                
                                      |
-| table_list                          | Map                                 | 
否    | -                        | 主题列表配置,你可以同时配置一个 `table_list` 和一个 `topic`。    
                                                                                
                                                                                
                                                                                
                                 |
-| bootstrap.servers                   | String                              | 
是    | -                        | 逗号分隔的 Kafka brokers 列表。                       
                                                                                
                                                                                
                                                                                
                                 |
-| pattern                             | Boolean                             | 
否    | false                    | 如果 `pattern` 设置为 `true`,则会使用指定的正则表达式匹配并订阅主题。  
                                                                                
                                                                                
                                                                                
                                 |
-| consumer.group                      | String                              | 
否    | SeaTunnel-Consumer-Group | `Kafka 消费者组 ID`,用于区分不同的消费者组。                  
                                                                                
                                                                                
                                                                                
                                 |
-| commit_on_checkpoint                | Boolean                             | 
否    | true                     | 如果为 true,消费者的偏移量将会定期在后台提交。                    
                                                                                
                                                                                
                                                                                
                                 |
-| poll.timeout                        | Long                                | 
否    | 10000                    | kafka主动拉取时间间隔(毫秒)。                            
                                                                                
                                                                                
                                                                                
                                 |
-| kafka.config                        | Map                                 | 
否    | -                        | 除了上述必要参数外,用户还可以指定多个非强制的消费者客户端参数,覆盖 [Kafka 
官方文档](https://kafka.apache.org/documentation.html#consumerconfigs) 中指定的所有消费者参数。 
                                                                                
                                                                                
                                     |
-| schema                              | Config                              | 
否    | -                        | 数据结构,包括字段名称和字段类型。                             
                                                                                
                                                                                
                                                                                
                                 |
-| format                              | String                              | 
否    | json                     | 数据格式。默认格式为 json。可选格式包括 text, canal_json, 
debezium_json, ogg_json, maxwell_json, avro , protobuf和native。默认字段分隔符为 ", 
"。如果自定义分隔符,添加 "field_delimiter" 选项。如果使用 canal 格式,请参考 
[canal-json](../formats/canal-json.md) 了解详细信息。如果使用 debezium 格式,请参考 
[debezium-json](../formats/debezium-json.md)。一些Format的详细信息请参考 
[formats](../formats) |
-| format_error_handle_way             | String                              | 
否    | fail                     | 数据格式错误的处理方式。默认值为 fail,可选值为 fail 和 skip。当选择 
fail 时,数据格式错误将阻塞并抛出异常。当选择 skip 时,数据格式错误将跳过此行数据。                                 
                                                                                
                                                                                
                                    |
-| debezium_record_table_filter        | Config                              | 
否    | -                        | 用于过滤 debezium 格式的数据,仅当格式设置为 `debezium_json` 
时使用。请参阅下面的 `debezium_record_table_filter`                                       
                                                                                
                                                                                
                                   |
-| field_delimiter                     | String                              | 
否    | ,                        | 自定义数据格式的字段分隔符。                                
                                                                                
                                                                                
                                                                                
                                 |
-| start_mode                          | StartMode[earliest],[group_offsets] | 
否    | group_offsets            | 消费者的初始消费模式。                                   
                                                                                
                                                                                
                                                                                
                                 |
-| start_mode.offsets                  | Config                              | 
否    | -                        | 用于 specific_offsets 消费模式的偏移量。                 
                                                                                
                                                                                
                                                                                
                                 |
-| start_mode.timestamp                | Long                                | 
否    | -                        | 用于 "timestamp" 消费模式的时间。                       
                                                                                
                                                                                
                                                                                
                                 |
-| partition-discovery.interval-millis | Long                                | 
否    | -1                       | 动态发现主题和分区的间隔时间。                               
                                                                                
                                                                                
                                                                                
                                 |
-| common-options                      |                                     | 
否    | -                        | 源插件的常见参数,详情请参考 [Source Common 
Options](../source-common-options.md)。                                          
                                                                                
                                                                                
                                                 |
-| protobuf_message_name               | String                              | 
否    | -                        | 当格式设置为 protobuf 时有效,指定消息名称。                   
                                                                                
                                                                                
                                                                                
                                 |
-| protobuf_schema                     | String                              | 
否    | -                        | 当格式设置为 protobuf 时有效,指定 Schema 定义。             
                                                                                
                                                                                
                                                                                
                                 |
-| reader_cache_queue_size             | Integer                             | 
否    | 1024                     | 
Reader分片缓存队列,用于缓存分片对应的数据。占用大小取决于每个reader得到的分片量,而不是每个分片的数据量。                     
                                                                                
                                                                                
                                                          |
-| is_native                           | Boolean                             | 
No       | false                    | 支持保留record的源信息。                           
                                                                                
                                                                                
                                                                                
                                     |
+| 名称                                  | 类型                                  | 
是否必填 | 默认值                          | 描述                                        
                                                                                
                                                                                
                                                                                
                                     |
+|-------------------------------------|-------------------------------------|------|------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| topic                               | String                              | 
是    | -                            | 使用表作为数据源时要读取数据的主题名称。它也支持通过逗号分隔的多个主题列表,例如 
'topic-1,topic-2'。                                                              
                                                                                
                                                                                
                                      |
+| table_list                          | Map                                 | 
否    | -                            | 主题列表配置,你可以同时配置一个 `table_list` 和一个 
`topic`。                                                                        
                                                                                
                                                                                
                                             |
+| bootstrap.servers                   | String                              | 
是    | -                            | 逗号分隔的 Kafka brokers 列表。                   
                                                                                
                                                                                
                                                                                
                                     |
+| pattern                             | Boolean                             | 
否    | false                        | 如果 `pattern` 设置为 
`true`,则会使用指定的正则表达式匹配并订阅主题。                                                     
                                                                                
                                                                                
                                                              |
+| consumer.group                      | String                              | 
否    | SeaTunnel-Consumer-Group     | `Kafka 消费者组 ID`,用于区分不同的消费者组。              
                                                                                
                                                                                
                                                                                
                                     |
+| commit_on_checkpoint                | Boolean                             | 
否    | true                         | 如果为 true,消费者的偏移量将会定期在后台提交。                
                                                                                
                                                                                
                                                                                
                                     |
+| poll.timeout                        | Long                                | 
否    | 10000                        | kafka主动拉取时间间隔(毫秒)。                        
                                                                                
                                                                                
                                                                                
                                     |
+| kafka.config                        | Map                                 | 
否    | -                            | 除了上述必要参数外,用户还可以指定多个非强制的消费者客户端参数,覆盖 [Kafka 
官方文档](https://kafka.apache.org/documentation.html#consumerconfigs) 中指定的所有消费者参数。 
                                                                                
                                                                                
                                     |
+| schema                              | Config                              | 
否    | -                            | 数据结构,包括字段名称和字段类型。                         
                                                                                
                                                                                
                                                                                
                                     |
+| format                              | String                              | 
否    | json                         | 数据格式。默认格式为 json。可选格式包括 text, canal_json, 
debezium_json, ogg_json, maxwell_json, avro , protobuf和native。默认字段分隔符为 ", 
"。如果自定义分隔符,添加 "field_delimiter" 选项。如果使用 canal 格式,请参考 
[canal-json](../formats/canal-json.md) 了解详细信息。如果使用 debezium 格式,请参考 
[debezium-json](../formats/debezium-json.md)。一些Format的详细信息请参考 
[formats](../formats) |
+| format_error_handle_way             | String                              | 
否    | fail                         | 数据格式错误的处理方式。默认值为 fail,可选值为 fail 和 
skip。当选择 fail 时,数据格式错误将阻塞并抛出异常。当选择 skip 时,数据格式错误将跳过此行数据。                        
                                                                                
                                                                                
                                             |
+| debezium_record_table_filter        | Config                              | 
否    | -                            | 用于过滤 debezium 格式的数据,仅当格式设置为 
`debezium_json` 时使用。请参阅下面的 `debezium_record_table_filter`                       
                                                                                
                                                                                
                                                   |
+| field_delimiter                     | String                              | 
否    | ,                            | 自定义数据格式的字段分隔符。                            
                                                                                
                                                                                
                                                                                
                                     |
+| start_mode                          | StartMode[earliest],[group_offsets] | 
否    | group_offsets                | 消费者的初始消费模式。                               
                                                                                
                                                                                
                                                                                
                                     |
+| start_mode.offsets                  | Config                              | 
否    | -                            | 用于 specific_offsets 消费模式的偏移量。             
                                                                                
                                                                                
                                                                                
                                     |
+| start_mode.timestamp                | Long                                | 
否    | -                            | 用于 "timestamp" 消费模式的时间。                   
                                                                                
                                                                                
                                                                                
                                     |
+| start_mode.end_timestamp             | Long                                | 
否    | -                            | 用于 "timestamp" 消费模式的结束时间,只支持批模式           
                                                                                
                                                                                
                                                                                
                                  |
+| partition-discovery.interval-millis | Long                                | 
否    | -1                           | 动态发现主题和分区的间隔时间。                           
                                                                                
                                                                                
                                                                                
                                     |
+| common-options                      |                                     | 
否    | -                            | 源插件的常见参数,详情请参考 [Source Common 
Options](../source-common-options.md)。                                          
                                                                                
                                                                                
                                                 |
+| protobuf_message_name               | String                              | 
否    | -                            | 当格式设置为 protobuf 时有效,指定消息名称。               
                                                                                
                                                                                
                                                                                
                                     |
+| protobuf_schema                     | String                              | 
否    | -                            | 当格式设置为 protobuf 时有效,指定 Schema 定义。         
                                                                                
                                                                                
                                                                                
                                     |
+| reader_cache_queue_size             | Integer                             | 
否    | 1024                         | 
Reader分片缓存队列,用于缓存分片对应的数据。占用大小取决于每个reader得到的分片量,而不是每个分片的数据量。                     
                                                                                
                                                                                
                                                                               |
+| is_native                           | Boolean                             | 
No   | false                        | 支持保留record的源信息。                           
                                                                                
                                                                                
                                                                                
                                     |
 
 ### debezium_record_table_filter
 
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSourceOptions.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSourceOptions.java
index 35b9573beb..51b80e81ac 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSourceOptions.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSourceOptions.java
@@ -109,4 +109,11 @@ public class KafkaSourceOptions extends KafkaBaseOptions {
                             "The processing method of data format error. The 
default value is fail, and the optional value is (fail, skip). "
                                     + "When fail is selected, data format 
error will block and an exception will be thrown. "
                                     + "When skip is selected, data format 
error will skip this line data.");
+
+    public static final Option<Long> START_MODE_END_TIMESTAMP =
+            Options.key("start_mode.end_timestamp")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The time required for consumption mode to be 
timestamp.The endTimestamp configuration specifies the end timestamp of the 
messages and is only applicable in batch mode");
 }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
index 8eb951988f..70d08e5221 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
@@ -92,6 +92,21 @@ public class DefaultSeaTunnelRowSerializer implements 
SeaTunnelRowSerializer {
                 headersExtractor(rowType));
     }
 
+    public static DefaultSeaTunnelRowSerializer create(
+            String topic,
+            MessageFormat format,
+            SeaTunnelRowType rowType,
+            String delimiter,
+            ReadonlyConfig pluginConfig) {
+        return new DefaultSeaTunnelRowSerializer(
+                topicExtractor(topic, rowType, format),
+                partitionExtractor(null),
+                timestampExtractor(rowType),
+                keyExtractor(null, rowType, format, null, null),
+                valueExtractor(rowType, format, delimiter, pluginConfig),
+                headersExtractor());
+    }
+
     public static DefaultSeaTunnelRowSerializer create(
             String topic,
             SeaTunnelRowType rowType,
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/ConsumerMetadata.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/ConsumerMetadata.java
index c129fbdc93..91591253ce 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/ConsumerMetadata.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/ConsumerMetadata.java
@@ -42,4 +42,5 @@ public class ConsumerMetadata implements Serializable {
     private Long startOffsetsTimestamp;
     private DeserializationSchema<SeaTunnelRow> deserializationSchema;
     private CatalogTable catalogTable;
+    private Long endOffsetsTimestamp;
 }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
index ddc67ebe17..f601b9bcf7 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
@@ -65,6 +65,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.stream.Collectors;
@@ -92,6 +93,7 @@ import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSource
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.PROTOBUF_SCHEMA;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.READER_CACHE_QUEUE_SIZE;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.START_MODE;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.START_MODE_END_TIMESTAMP;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.START_MODE_OFFSETS;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.START_MODE_TIMESTAMP;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.TOPIC;
@@ -187,6 +189,18 @@ public class KafkaSourceConfig implements Serializable {
                                     }
                                     consumerMetadata.setStartOffsetsTimestamp(
                                             startOffsetsTimestamp);
+                                    if (Objects.nonNull(
+                                            
readonlyConfig.get(START_MODE_END_TIMESTAMP))) {
+                                        long endOffsetsTimestamp =
+                                                
readonlyConfig.get(START_MODE_END_TIMESTAMP);
+                                        if (endOffsetsTimestamp < 0
+                                                || endOffsetsTimestamp > 
currentTimestamp) {
+                                            throw new IllegalArgumentException(
+                                                    "start_mode.endTimestamp 
The value is smaller than 0 or smaller than the current time");
+                                        }
+                                        
consumerMetadata.setEndOffsetsTimestamp(
+                                                endOffsetsTimestamp);
+                                    }
                                     break;
                                 case SPECIFIC_OFFSETS:
                                     // Key is topic-partition, value is offset
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
index ce802412f9..686e0b71f3 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
@@ -155,6 +155,7 @@ public class KafkaSourceSplitEnumerator
     private void setPartitionStartOffset() throws ExecutionException, 
InterruptedException {
         Set<TopicPartition> pendingTopicPartitions = pendingSplit.keySet();
         Map<TopicPartition, Long> topicPartitionOffsets = new HashMap<>();
+        Map<TopicPartition, Long> topicPartitionEndOffsets = new HashMap<>();
         // Set kafka TopicPartition based on the topicPath granularity
         Map<TablePath, Set<TopicPartition>> tablePathPartitionMap =
                 pendingTopicPartitions.stream()
@@ -182,6 +183,13 @@ public class KafkaSourceSplitEnumerator
                             listOffsets(
                                     topicPartitions,
                                     
OffsetSpec.forTimestamp(metadata.getStartOffsetsTimestamp())));
+                    if (Objects.nonNull(metadata.getEndOffsetsTimestamp())) {
+                        topicPartitionEndOffsets.putAll(
+                                listOffsets(
+                                        topicPartitions,
+                                        OffsetSpec.forTimestamp(
+                                                
metadata.getEndOffsetsTimestamp())));
+                    }
                     break;
                 case SPECIFIC_OFFSETS:
                     
topicPartitionOffsets.putAll(metadata.getSpecificStartOffsets());
@@ -197,6 +205,14 @@ public class KafkaSourceSplitEnumerator
                         pendingSplit.get(key).setStartOffset(value);
                     }
                 });
+        if (!isStreamingMode && !topicPartitionEndOffsets.isEmpty()) {
+            topicPartitionEndOffsets.forEach(
+                    (key, value) -> {
+                        if (pendingSplit.containsKey(key)) {
+                            pendingSplit.get(key).setEndOffset(value);
+                        }
+                    });
+        }
     }
 
     @Override
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index 82aa3f85af..4a4352e3fb 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -409,6 +409,22 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
         testKafkaTimestampToConsole(container);
     }
 
+    @TestTemplate
+    public void testSourceKafkaWithEndTimestamp(TestContainer container)
+            throws IOException, InterruptedException {
+        DefaultSeaTunnelRowSerializer serializer =
+                DefaultSeaTunnelRowSerializer.create(
+                        "test_topic_source",
+                        DEFAULT_FORMAT,
+                        new SeaTunnelRowType(
+                                new String[] {"id", "timestamp"},
+                                new SeaTunnelDataType[] {BasicType.LONG_TYPE, 
BasicType.LONG_TYPE}),
+                        "",
+                        null);
+        generateWithTimestampTestData(serializer::serializeRow, 0, 100, 
1738395840000L);
+        testKafkaWithEndTimestampToConsole(container);
+    }
+
     @TestTemplate
     public void testSourceKafkaStartConfig(TestContainer container)
             throws IOException, InterruptedException {
@@ -1084,6 +1100,13 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
         Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
     }
 
+    public void testKafkaWithEndTimestampToConsole(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                
container.executeJob("/kafka/kafkasource_endTimestamp_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+    }
+
     private AdminClient createKafkaAdmin() {
         Properties props = new Properties();
         String bootstrapServers = kafkaContainer.getBootstrapServers();
@@ -1163,6 +1186,21 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
         producer.flush();
     }
 
+    private void generateWithTimestampTestData(
+            ProducerRecordConverter converter, int start, int end, long 
startTimestamp) {
+        try {
+            for (int i = start; i < end; i++) {
+                SeaTunnelRow row =
+                        new SeaTunnelRow(new Object[] {Long.valueOf(i), 
startTimestamp + i * 1000});
+                ProducerRecord<byte[], byte[]> producerRecord = 
converter.convert(row);
+                producer.send(producerRecord).get();
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        producer.flush();
+    }
+
     private void generateNativeTestData(String topic, int start, int end) {
         try {
             for (int i = start; i < end; i++) {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_endTimestamp_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_endTimestamp_to_console.conf
new file mode 100644
index 0000000000..8f2edca236
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_endTimestamp_to_console.conf
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "test_topic_source"
+    plugin_output = "kafka_table"
+    # The default format is json, which is optional
+    format = json
+    start_mode = timestamp
+    schema = {
+      fields {
+        id = bigint
+      }
+    }
+    start_mode.timestamp = 1738395840000
+    start_mode.end_timestamp= 1738395900000
+  }
+
+  # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
+  # please go to 
https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource
+}
+
+transform {
+}
+
+sink {
+  Assert {
+    plugin_input = "kafka_table"
+    rules =
+      {
+        field_rules = [
+          {
+            field_name = id
+            field_type = bigint
+            field_value = [
+
+              {
+                rule_type = MIN
+                rule_value = 0
+              },
+              {
+                rule_type = MAX
+                rule_value = 60
+              }
+            ]
+          }
+        ]
+      }
+  }
+}


Reply via email to