This is an automated email from the ASF dual-hosted git repository.

vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 59b6c5fc52 [INLONG-10562][Manager] SortConfig supports set start and 
stop consuming time (#10563)
59b6c5fc52 is described below

commit 59b6c5fc527d0242a05b2e3a80a09359a6faa1d0
Author: vernedeng <verned...@apache.org>
AuthorDate: Fri Jul 5 10:19:44 2024 +0800

    [INLONG-10562][Manager] SortConfig supports set start and stop consuming 
time (#10563)
    
    * [INLONG-10562][Manager] SortConfig supports set start and stop consume 
time
---
 .../common/pojo/sort/dataflow/SourceConfig.java    |  2 ++
 .../inlong/manager/pojo/sink/BaseStreamSink.java   | 32 +++++++++--------
 .../manager/pojo/sink/ck/ClickHouseSinkDTO.java    |  3 +-
 .../inlong/manager/pojo/sink/cls/ClsSinkDTO.java   |  3 +-
 .../manager/pojo/sink/doris/DorisSinkDTO.java      |  3 +-
 .../manager/pojo/sink/es/ElasticsearchSinkDTO.java |  3 +-
 .../pojo/sink/greenplum/GreenplumSinkDTO.java      |  3 +-
 .../manager/pojo/sink/hbase/HBaseSinkDTO.java      |  3 +-
 .../inlong/manager/pojo/sink/hdfs/HDFSSinkDTO.java |  3 +-
 .../inlong/manager/pojo/sink/hive/HiveSinkDTO.java |  3 +-
 .../inlong/manager/pojo/sink/hudi/HudiSinkDTO.java |  3 +-
 .../manager/pojo/sink/iceberg/IcebergSinkDTO.java  |  3 +-
 .../manager/pojo/sink/kafka/KafkaSinkDTO.java      |  3 +-
 .../inlong/manager/pojo/sink/kudu/KuduSinkDTO.java |  3 +-
 .../manager/pojo/sink/mysql/MySQLSinkDTO.java      |  3 +-
 .../manager/pojo/sink/oracle/OracleSinkDTO.java    |  3 +-
 .../pojo/sink/postgresql/PostgreSQLSinkDTO.java    |  3 +-
 .../manager/pojo/sink/pulsar/PulsarSinkDTO.java    |  3 +-
 .../manager/pojo/sink/redis/RedisSinkDTO.java      |  3 +-
 .../pojo/sink/sqlserver/SQLServerSinkDTO.java      |  3 +-
 .../pojo/sink/starrocks/StarRocksSinkDTO.java      |  3 +-
 .../tdsqlpostgresql/TDSQLPostgreSQLSinkDTO.java    |  3 +-
 .../apache/inlong/sdk/sort/entity/InLongTopic.java | 18 ++++++++++
 .../org/apache/inlong/sdk/sort/util/TimeUtil.java  | 41 +++++++++-------------
 .../loader/SortConfigQueryConsumeConfig.java       |  2 ++
 25 files changed, 95 insertions(+), 60 deletions(-)

diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/SourceConfig.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/SourceConfig.java
index 7f11d72b5b..cb16bfead3 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/SourceConfig.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/SourceConfig.java
@@ -31,6 +31,8 @@ public class SourceConfig implements Serializable {
 
     private String topic;
     private String subscription;
+    private String startConsumeTime;
+    private String stopConsumeTime;
     private String encodingType;
     private DeserializationConfig deserializationConfig;
     private DataTypeConfig dataTypeConfig;
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/SourceConfig.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
similarity index 56%
copy from 
inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/SourceConfig.java
copy to 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
index 7f11d72b5b..09115a9e02 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/SourceConfig.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
@@ -15,24 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.common.pojo.sort.dataflow;
-
-import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
-import 
org.apache.inlong.common.pojo.sort.dataflow.deserialization.DeserializationConfig;
-import org.apache.inlong.common.pojo.sort.dataflow.field.FieldConfig;
+package org.apache.inlong.manager.pojo.sink;
 
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 
-import java.io.Serializable;
-import java.util.List;
-
+/**
+ * The base parameter class of StreamSink, support user extend their own 
business params.
+ */
 @Data
-public class SourceConfig implements Serializable {
+@AllArgsConstructor
+@NoArgsConstructor
+@ApiModel("Base info of stream sink")
+public class BaseStreamSink {
+
+    @ApiModelProperty("Start consume time, yyyy-MM-dd HH:mm:ss format")
+    private String startConsumeTime;
 
-    private String topic;
-    private String subscription;
-    private String encodingType;
-    private DeserializationConfig deserializationConfig;
-    private DataTypeConfig dataTypeConfig;
-    private List<FieldConfig> fieldConfigs;
+    @ApiModelProperty("Stop consume time, yyyy-MM-dd HH:mm:ss format")
+    private String stopConsumeTime;
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSinkDTO.java
index e30186c1f9..359b87c8d4 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSinkDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSinkDTO.java
@@ -22,6 +22,7 @@ import 
org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.AESUtils;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.pojo.sink.BaseStreamSink;
 
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
@@ -44,7 +45,7 @@ import java.util.Objects;
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
-public class ClickHouseSinkDTO {
+public class ClickHouseSinkDTO extends BaseStreamSink {
 
     @ApiModelProperty("JDBC URL of the ClickHouse server")
     private String jdbcUrl;
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSinkDTO.java
index 87508b5e1c..ec8b647e00 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSinkDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSinkDTO.java
@@ -21,6 +21,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.pojo.sink.BaseStreamSink;
 
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
@@ -38,7 +39,7 @@ import javax.validation.constraints.NotNull;
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
-public class ClsSinkDTO {
+public class ClsSinkDTO extends BaseStreamSink {
 
     @ApiModelProperty("Cloud log service topic id")
     private String topicId;
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/doris/DorisSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/doris/DorisSinkDTO.java
index 262c3596ae..caec62794b 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/doris/DorisSinkDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/doris/DorisSinkDTO.java
@@ -22,6 +22,7 @@ import 
org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.AESUtils;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.pojo.sink.BaseStreamSink;
 
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
@@ -42,7 +43,7 @@ import java.util.Map;
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
-public class DorisSinkDTO {
+public class DorisSinkDTO extends BaseStreamSink {
 
     @ApiModelProperty("Doris FE http address")
     private String feNodes;
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java
index 4045231768..ddda6bc4db 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java
@@ -21,6 +21,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.pojo.sink.BaseStreamSink;
 
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
@@ -38,7 +39,7 @@ import javax.validation.constraints.NotNull;
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
-public class ElasticsearchSinkDTO {
+public class ElasticsearchSinkDTO extends BaseStreamSink {
 
     @ApiModelProperty("indexNamePattern")
     private String indexNamePattern;
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/greenplum/GreenplumSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/greenplum/GreenplumSinkDTO.java
index b44d00f4b3..072a174563 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/greenplum/GreenplumSinkDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/greenplum/GreenplumSinkDTO.java
@@ -21,6 +21,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.pojo.sink.BaseStreamSink;
 
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
@@ -41,7 +42,7 @@ import java.util.Map;
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
-public class GreenplumSinkDTO {
+public class GreenplumSinkDTO extends BaseStreamSink {
 
     @ApiModelProperty("JDBC URL of Greenplum server, such as: 
jdbc:postgresql://host:port/database")
     private String jdbcUrl;
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hbase/HBaseSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hbase/HBaseSinkDTO.java
index 780f0e7eac..bebe25b247 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hbase/HBaseSinkDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hbase/HBaseSinkDTO.java
@@ -21,6 +21,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.pojo.sink.BaseStreamSink;
 
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
@@ -41,7 +42,7 @@ import java.util.Map;
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
-public class HBaseSinkDTO {
+public class HBaseSinkDTO extends BaseStreamSink {
 
     @ApiModelProperty("Target namespace")
     private String namespace;
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hdfs/HDFSSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hdfs/HDFSSinkDTO.java
index ff165a9927..3206cbd79e 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hdfs/HDFSSinkDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hdfs/HDFSSinkDTO.java
@@ -21,6 +21,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.pojo.sink.BaseStreamSink;
 
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
@@ -41,7 +42,7 @@ import java.util.Map;
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
-public class HDFSSinkDTO {
+public class HDFSSinkDTO extends BaseStreamSink {
 
     @ApiModelProperty("File format, support: TextFile, RCFile, SequenceFile, 
Avro")
     private String fileFormat;
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hive/HiveSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hive/HiveSinkDTO.java
index 946b4c0933..4468f25c22 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hive/HiveSinkDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hive/HiveSinkDTO.java
@@ -22,6 +22,7 @@ import 
org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.AESUtils;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.pojo.sink.BaseStreamSink;
 
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
@@ -44,7 +45,7 @@ import java.util.Map;
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
-public class HiveSinkDTO {
+public class HiveSinkDTO extends BaseStreamSink {
 
     @ApiModelProperty("Hive JDBC URL, such as jdbc:hive2://${ip}:${port}")
     private String jdbcUrl;
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkDTO.java
index c280e88438..52db2e087b 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkDTO.java
@@ -21,6 +21,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.pojo.sink.BaseStreamSink;
 
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
@@ -42,7 +43,7 @@ import java.util.Map;
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
-public class HudiSinkDTO {
+public class HudiSinkDTO extends BaseStreamSink {
 
     @ApiModelProperty("Catalog type, like: HIVE, HADOOP, default is HIVE")
     @Builder.Default
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkDTO.java
index 379aaa0a75..edad042d96 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkDTO.java
@@ -21,6 +21,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.pojo.sink.BaseStreamSink;
 
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
@@ -41,7 +42,7 @@ import java.util.Map;
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
-public class IcebergSinkDTO {
+public class IcebergSinkDTO extends BaseStreamSink {
 
     @ApiModelProperty("Catalog type, like: HIVE, HADOOP, default is HIVE")
     @Builder.Default
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkDTO.java
index 8b60baba2a..38c875ad04 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkDTO.java
@@ -21,6 +21,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.pojo.sink.BaseStreamSink;
 
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
@@ -40,7 +41,7 @@ import java.util.Map;
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
-public class KafkaSinkDTO {
+public class KafkaSinkDTO extends BaseStreamSink {
 
     @ApiModelProperty("Kafka bootstrap servers")
     private String bootstrapServers;
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSinkDTO.java
index d07f7289b8..e536b04d3f 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSinkDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSinkDTO.java
@@ -21,6 +21,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.pojo.sink.BaseStreamSink;
 
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
@@ -41,7 +42,7 @@ import java.util.Map;
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
-public class KuduSinkDTO {
+public class KuduSinkDTO extends BaseStreamSink {
 
     @ApiModelProperty("Kudu masters, a comma separated list of 'host:port' 
pairs")
     private String masters;
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTO.java
index 5b5750a803..e9cac1f77a 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTO.java
@@ -22,6 +22,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.pojo.sink.BaseStreamSink;
 import org.apache.inlong.manager.pojo.util.MySQLSensitiveUrlUtils;
 
 import com.google.common.base.Strings;
@@ -48,7 +49,7 @@ import java.util.regex.Pattern;
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
-public class MySQLSinkDTO {
+public class MySQLSinkDTO extends BaseStreamSink {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(MySQLSinkDTO.class);
     private static final String MYSQL_JDBC_PREFIX = "jdbc:mysql://";
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oracle/OracleSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oracle/OracleSinkDTO.java
index b65f2b6937..06a066a680 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oracle/OracleSinkDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oracle/OracleSinkDTO.java
@@ -21,6 +21,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.pojo.sink.BaseStreamSink;
 
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
@@ -43,7 +44,7 @@ import java.util.Map;
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
-public class OracleSinkDTO {
+public class OracleSinkDTO extends BaseStreamSink {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(OracleSinkDTO.class);
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/postgresql/PostgreSQLSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/postgresql/PostgreSQLSinkDTO.java
index 18d02d75a7..2bf011f424 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/postgresql/PostgreSQLSinkDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/postgresql/PostgreSQLSinkDTO.java
@@ -22,6 +22,7 @@ import 
org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.AESUtils;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.pojo.sink.BaseStreamSink;
 
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
@@ -45,7 +46,7 @@ import java.util.Map;
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
-public class PostgreSQLSinkDTO {
+public class PostgreSQLSinkDTO extends BaseStreamSink {
 
     @ApiModelProperty("JDBC URL of the PostgreSQL server")
     @Length(max = 512, message = "length must be less than or equal to 512")
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/pulsar/PulsarSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/pulsar/PulsarSinkDTO.java
index 1d80e91c8e..d693af80cd 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/pulsar/PulsarSinkDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/pulsar/PulsarSinkDTO.java
@@ -21,6 +21,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.pojo.sink.BaseStreamSink;
 
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
@@ -38,7 +39,7 @@ import javax.validation.constraints.NotNull;
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
-public class PulsarSinkDTO {
+public class PulsarSinkDTO extends BaseStreamSink {
 
     @ApiModelProperty("Pulsar tenant")
     private String pulsarTenant;
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisSinkDTO.java
index 24c27529fe..c578006588 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisSinkDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisSinkDTO.java
@@ -21,6 +21,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.pojo.sink.BaseStreamSink;
 
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
@@ -40,7 +41,7 @@ import java.util.Map;
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
-public class RedisSinkDTO {
+public class RedisSinkDTO extends BaseStreamSink {
 
     @ApiModelProperty("Redis cluster mode")
     private String clusterMode;
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/sqlserver/SQLServerSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/sqlserver/SQLServerSinkDTO.java
index c849f0e15d..93e55b1094 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/sqlserver/SQLServerSinkDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/sqlserver/SQLServerSinkDTO.java
@@ -21,6 +21,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.pojo.sink.BaseStreamSink;
 
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
@@ -40,7 +41,7 @@ import java.util.List;
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
-public class SQLServerSinkDTO {
+public class SQLServerSinkDTO extends BaseStreamSink {
 
     @ApiModelProperty("Username of the SQLServer")
     private String username;
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSinkDTO.java
index 8f10c0b6a7..2e0edaab4d 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSinkDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSinkDTO.java
@@ -22,6 +22,7 @@ import 
org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.AESUtils;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.pojo.sink.BaseStreamSink;
 
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
@@ -44,7 +45,7 @@ import java.util.Objects;
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
-public class StarRocksSinkDTO {
+public class StarRocksSinkDTO extends BaseStreamSink {
 
     @ApiModelProperty("StarRocks jdbc url")
     private String jdbcUrl;
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/tdsqlpostgresql/TDSQLPostgreSQLSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/tdsqlpostgresql/TDSQLPostgreSQLSinkDTO.java
index 9431ae397f..3ef1b25fb4 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/tdsqlpostgresql/TDSQLPostgreSQLSinkDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/tdsqlpostgresql/TDSQLPostgreSQLSinkDTO.java
@@ -21,6 +21,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.pojo.sink.BaseStreamSink;
 
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
@@ -40,7 +41,7 @@ import java.util.Map;
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
-public class TDSQLPostgreSQLSinkDTO {
+public class TDSQLPostgreSQLSinkDTO extends BaseStreamSink {
 
     @ApiModelProperty("TDSQLPostgreSQL jdbc url, such as 
jdbc:postgresql://host:port/database")
     private String jdbcUrl;
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/InLongTopic.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/InLongTopic.java
index c90a710d07..3b636c912b 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/InLongTopic.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/InLongTopic.java
@@ -27,8 +27,26 @@ public class InLongTopic {
     private int partitionId;
     // pulsar,kafka,tube
     private String topicType;
+    private String startConsumeTime;
+    private String stopConsumeTime;
     private Map<String, Object> properties;
 
+    public void setStopConsumeTime(String stopConsumeTime) {
+        this.stopConsumeTime = stopConsumeTime;
+    }
+
+    public void setStartConsumeTime(String startConsumeTime) {
+        this.startConsumeTime = startConsumeTime;
+    }
+
+    public String getStartConsumeTime() {
+        return startConsumeTime;
+    }
+
+    public String getStopConsumeTime() {
+        return stopConsumeTime;
+    }
+
     public String getTopic() {
         return topic;
     }
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/util/TimeUtil.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/util/TimeUtil.java
index 1fc4ab2b36..f150ab15e5 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/util/TimeUtil.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/util/TimeUtil.java
@@ -19,49 +19,40 @@ package org.apache.inlong.sdk.sort.util;
 
 import org.apache.inlong.sdk.sort.entity.InLongTopic;
 
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
-import java.util.Optional;
 
 public class TimeUtil {
 
     private static final Logger logger = 
LoggerFactory.getLogger(TimeUtil.class);
     private static final DateTimeFormatter DATE_FORMAT = 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
-    private static final String KEY_SDK_START_TIME = "sortSdk.startTime";
-    private static final String KEY_SDK_STOP_TIME = "sortSdk.stopTime";
     private static final long DEFAULT_START_TIME = -1L;
     private static final long DEFAULT_STOP_TIME = Long.MAX_VALUE;
 
     public static long parseStartTime(InLongTopic inLongTopic) {
-        return 
Optional.ofNullable(inLongTopic.getProperties().get(KEY_SDK_START_TIME))
-                .map(s -> {
-                    try {
-                        LocalDateTime time = LocalDateTime.parse(s.toString(), 
DATE_FORMAT);
-                        return 
LocalDateTime.from(time).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
-                    } catch (Throwable t) {
-                        logger.error("parse start time failed, plz check the 
format of start time : {}", s, t);
-                    }
-                    return DEFAULT_START_TIME;
-                })
-                .orElse(DEFAULT_START_TIME);
+        return parseTime(inLongTopic.getStartConsumeTime(), 
DEFAULT_START_TIME);
     }
 
     public static long parseStopTime(InLongTopic inLongTopic) {
-        return 
Optional.ofNullable(inLongTopic.getProperties().get(KEY_SDK_STOP_TIME))
-                .map(s -> {
-                    try {
-                        LocalDateTime time = LocalDateTime.parse(s.toString(), 
DATE_FORMAT);
-                        return 
LocalDateTime.from(time).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
-                    } catch (Throwable t) {
-                        logger.error("parse start time failed, plz check the 
format of stop time : {}", s, t);
-                    }
-                    return DEFAULT_STOP_TIME;
-                })
-                .orElse(DEFAULT_STOP_TIME);
+        return parseTime(inLongTopic.getStopConsumeTime(), DEFAULT_STOP_TIME);
+    }
+
+    private static long parseTime(String time, long defaultValue) {
+        if (StringUtils.isEmpty(time)) {
+            return defaultValue;
+        }
+        try {
+            LocalDateTime localDateTime = LocalDateTime.parse(time, 
DATE_FORMAT);
+            return 
LocalDateTime.from(localDateTime).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
+        } catch (Throwable t) {
+            logger.error("parse start time failed, plz check the format of 
time : {}", time, t);
+        }
+        return defaultValue;
     }
 
 }
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortConfigQueryConsumeConfig.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortConfigQueryConsumeConfig.java
index 57f5059c16..048bace958 100644
--- 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortConfigQueryConsumeConfig.java
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortConfigQueryConsumeConfig.java
@@ -64,6 +64,8 @@ public class SortConfigQueryConsumeConfig implements 
QueryConsumeConfig {
                 topic.setTopic(flow.getSourceConfig().getTopic());
                 // only supports pulsar now
                 topic.setTopicType(InlongTopicTypeEnum.PULSAR.getName());
+                
topic.setStartConsumeTime(flow.getSourceConfig().getStartConsumeTime());
+                
topic.setStopConsumeTime(flow.getSourceConfig().getStopConsumeTime());
                 topic.setProperties(flow.getProperties() != null ? 
flow.getProperties() : new HashMap<>());
                 topics.add(topic);
             }

Reply via email to