This is an automated email from the ASF dual-hosted git repository. vernedeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 59b6c5fc52 [INLONG-10562][Manager] SortConfig supports set start and stop consuming time (#10563) 59b6c5fc52 is described below commit 59b6c5fc527d0242a05b2e3a80a09359a6faa1d0 Author: vernedeng <verned...@apache.org> AuthorDate: Fri Jul 5 10:19:44 2024 +0800 [INLONG-10562][Manager] SortConfig supports set start and stop consuming time (#10563) * [INLONG-10562][Manager] SortConfig supports set start and stop consume time --- .../common/pojo/sort/dataflow/SourceConfig.java | 2 ++ .../inlong/manager/pojo/sink/BaseStreamSink.java | 32 +++++++++-------- .../manager/pojo/sink/ck/ClickHouseSinkDTO.java | 3 +- .../inlong/manager/pojo/sink/cls/ClsSinkDTO.java | 3 +- .../manager/pojo/sink/doris/DorisSinkDTO.java | 3 +- .../manager/pojo/sink/es/ElasticsearchSinkDTO.java | 3 +- .../pojo/sink/greenplum/GreenplumSinkDTO.java | 3 +- .../manager/pojo/sink/hbase/HBaseSinkDTO.java | 3 +- .../inlong/manager/pojo/sink/hdfs/HDFSSinkDTO.java | 3 +- .../inlong/manager/pojo/sink/hive/HiveSinkDTO.java | 3 +- .../inlong/manager/pojo/sink/hudi/HudiSinkDTO.java | 3 +- .../manager/pojo/sink/iceberg/IcebergSinkDTO.java | 3 +- .../manager/pojo/sink/kafka/KafkaSinkDTO.java | 3 +- .../inlong/manager/pojo/sink/kudu/KuduSinkDTO.java | 3 +- .../manager/pojo/sink/mysql/MySQLSinkDTO.java | 3 +- .../manager/pojo/sink/oracle/OracleSinkDTO.java | 3 +- .../pojo/sink/postgresql/PostgreSQLSinkDTO.java | 3 +- .../manager/pojo/sink/pulsar/PulsarSinkDTO.java | 3 +- .../manager/pojo/sink/redis/RedisSinkDTO.java | 3 +- .../pojo/sink/sqlserver/SQLServerSinkDTO.java | 3 +- .../pojo/sink/starrocks/StarRocksSinkDTO.java | 3 +- .../tdsqlpostgresql/TDSQLPostgreSQLSinkDTO.java | 3 +- .../apache/inlong/sdk/sort/entity/InLongTopic.java | 18 ++++++++++ .../org/apache/inlong/sdk/sort/util/TimeUtil.java | 41 +++++++++------------- .../loader/SortConfigQueryConsumeConfig.java | 2 ++ 25 files changed, 95 insertions(+), 60 deletions(-) diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/SourceConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/SourceConfig.java index 7f11d72b5b..cb16bfead3 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/SourceConfig.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/SourceConfig.java @@ -31,6 +31,8 @@ public class SourceConfig implements Serializable { private String topic; private String subscription; + private String startConsumeTime; + private String stopConsumeTime; private String encodingType; private DeserializationConfig deserializationConfig; private DataTypeConfig dataTypeConfig; diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/SourceConfig.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java similarity index 56% copy from inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/SourceConfig.java copy to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java index 7f11d72b5b..09115a9e02 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/SourceConfig.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java @@ -15,24 +15,26 @@ * limitations under the License. */ -package org.apache.inlong.common.pojo.sort.dataflow; - -import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig; -import org.apache.inlong.common.pojo.sort.dataflow.deserialization.DeserializationConfig; -import org.apache.inlong.common.pojo.sort.dataflow.field.FieldConfig; +package org.apache.inlong.manager.pojo.sink; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; import lombok.Data; +import lombok.NoArgsConstructor; -import java.io.Serializable; -import java.util.List; - +/** + * The base parameter class of StreamSink, support user extend their own business params. + */ @Data -public class SourceConfig implements Serializable { +@AllArgsConstructor +@NoArgsConstructor +@ApiModel("Base info of stream sink") +public class BaseStreamSink { + + @ApiModelProperty("Start consume time, yyyy-MM-dd HH:mm:ss format") + private String startConsumeTime; - private String topic; - private String subscription; - private String encodingType; - private DeserializationConfig deserializationConfig; - private DataTypeConfig dataTypeConfig; - private List<FieldConfig> fieldConfigs; + @ApiModelProperty("Stop consume time, yyyy-MM-dd HH:mm:ss format") + private String stopConsumeTime; } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSinkDTO.java index e30186c1f9..359b87c8d4 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSinkDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSinkDTO.java @@ -22,6 +22,7 @@ import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.AESUtils; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.pojo.sink.BaseStreamSink; import io.swagger.annotations.ApiModelProperty; import lombok.AllArgsConstructor; @@ -44,7 +45,7 @@ import java.util.Objects; @Builder @NoArgsConstructor @AllArgsConstructor -public class ClickHouseSinkDTO { +public class ClickHouseSinkDTO extends BaseStreamSink { @ApiModelProperty("JDBC URL of the ClickHouse server") private String jdbcUrl; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSinkDTO.java index 87508b5e1c..ec8b647e00 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSinkDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSinkDTO.java @@ -21,6 +21,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.pojo.sink.BaseStreamSink; import io.swagger.annotations.ApiModelProperty; import lombok.AllArgsConstructor; @@ -38,7 +39,7 @@ import javax.validation.constraints.NotNull; @Builder @NoArgsConstructor @AllArgsConstructor -public class ClsSinkDTO { +public class ClsSinkDTO extends BaseStreamSink { @ApiModelProperty("Cloud log service topic id") private String topicId; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/doris/DorisSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/doris/DorisSinkDTO.java index 262c3596ae..caec62794b 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/doris/DorisSinkDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/doris/DorisSinkDTO.java @@ -22,6 +22,7 @@ import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.AESUtils; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.pojo.sink.BaseStreamSink; import io.swagger.annotations.ApiModelProperty; import lombok.AllArgsConstructor; @@ -42,7 +43,7 @@ import java.util.Map; @Builder @NoArgsConstructor @AllArgsConstructor -public class DorisSinkDTO { +public class DorisSinkDTO extends BaseStreamSink { @ApiModelProperty("Doris FE http address") private String feNodes; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java index 4045231768..ddda6bc4db 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java @@ -21,6 +21,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.pojo.sink.BaseStreamSink; import io.swagger.annotations.ApiModelProperty; import lombok.AllArgsConstructor; @@ -38,7 +39,7 @@ import javax.validation.constraints.NotNull; @Builder @NoArgsConstructor @AllArgsConstructor -public class ElasticsearchSinkDTO { +public class ElasticsearchSinkDTO extends BaseStreamSink { @ApiModelProperty("indexNamePattern") private String indexNamePattern; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/greenplum/GreenplumSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/greenplum/GreenplumSinkDTO.java index b44d00f4b3..072a174563 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/greenplum/GreenplumSinkDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/greenplum/GreenplumSinkDTO.java @@ -21,6 +21,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.pojo.sink.BaseStreamSink; import io.swagger.annotations.ApiModelProperty; import lombok.AllArgsConstructor; @@ -41,7 +42,7 @@ import java.util.Map; @Builder @NoArgsConstructor @AllArgsConstructor -public class GreenplumSinkDTO { +public class GreenplumSinkDTO extends BaseStreamSink { @ApiModelProperty("JDBC URL of Greenplum server, such as: jdbc:postgresql://host:port/database") private String jdbcUrl; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hbase/HBaseSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hbase/HBaseSinkDTO.java index 780f0e7eac..bebe25b247 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hbase/HBaseSinkDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hbase/HBaseSinkDTO.java @@ -21,6 +21,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.pojo.sink.BaseStreamSink; import io.swagger.annotations.ApiModelProperty; import lombok.AllArgsConstructor; @@ -41,7 +42,7 @@ import java.util.Map; @Builder @NoArgsConstructor @AllArgsConstructor -public class HBaseSinkDTO { +public class HBaseSinkDTO extends BaseStreamSink { @ApiModelProperty("Target namespace") private String namespace; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hdfs/HDFSSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hdfs/HDFSSinkDTO.java index ff165a9927..3206cbd79e 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hdfs/HDFSSinkDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hdfs/HDFSSinkDTO.java @@ -21,6 +21,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.pojo.sink.BaseStreamSink; import io.swagger.annotations.ApiModelProperty; import lombok.AllArgsConstructor; @@ -41,7 +42,7 @@ import java.util.Map; @Builder @NoArgsConstructor @AllArgsConstructor -public class HDFSSinkDTO { +public class HDFSSinkDTO extends BaseStreamSink { @ApiModelProperty("File format, support: TextFile, RCFile, SequenceFile, Avro") private String fileFormat; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hive/HiveSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hive/HiveSinkDTO.java index 946b4c0933..4468f25c22 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hive/HiveSinkDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hive/HiveSinkDTO.java @@ -22,6 +22,7 @@ import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.AESUtils; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.pojo.sink.BaseStreamSink; import io.swagger.annotations.ApiModelProperty; import lombok.AllArgsConstructor; @@ -44,7 +45,7 @@ import java.util.Map; @Builder @NoArgsConstructor @AllArgsConstructor -public class HiveSinkDTO { +public class HiveSinkDTO extends BaseStreamSink { @ApiModelProperty("Hive JDBC URL, such as jdbc:hive2://${ip}:${port}") private String jdbcUrl; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkDTO.java index c280e88438..52db2e087b 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkDTO.java @@ -21,6 +21,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.pojo.sink.BaseStreamSink; import io.swagger.annotations.ApiModelProperty; import lombok.AllArgsConstructor; @@ -42,7 +43,7 @@ import java.util.Map; @Builder @NoArgsConstructor @AllArgsConstructor -public class HudiSinkDTO { +public class HudiSinkDTO extends BaseStreamSink { @ApiModelProperty("Catalog type, like: HIVE, HADOOP, default is HIVE") @Builder.Default diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkDTO.java index 379aaa0a75..edad042d96 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkDTO.java @@ -21,6 +21,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.pojo.sink.BaseStreamSink; import io.swagger.annotations.ApiModelProperty; import lombok.AllArgsConstructor; @@ -41,7 +42,7 @@ import java.util.Map; @Builder @NoArgsConstructor @AllArgsConstructor -public class IcebergSinkDTO { +public class IcebergSinkDTO extends BaseStreamSink { @ApiModelProperty("Catalog type, like: HIVE, HADOOP, default is HIVE") @Builder.Default diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkDTO.java index 8b60baba2a..38c875ad04 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkDTO.java @@ -21,6 +21,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.pojo.sink.BaseStreamSink; import io.swagger.annotations.ApiModelProperty; import lombok.AllArgsConstructor; @@ -40,7 +41,7 @@ import java.util.Map; @Builder @NoArgsConstructor @AllArgsConstructor -public class KafkaSinkDTO { +public class KafkaSinkDTO extends BaseStreamSink { @ApiModelProperty("Kafka bootstrap servers") private String bootstrapServers; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSinkDTO.java index d07f7289b8..e536b04d3f 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSinkDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSinkDTO.java @@ -21,6 +21,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.pojo.sink.BaseStreamSink; import io.swagger.annotations.ApiModelProperty; import lombok.AllArgsConstructor; @@ -41,7 +42,7 @@ import java.util.Map; @Builder @NoArgsConstructor @AllArgsConstructor -public class KuduSinkDTO { +public class KuduSinkDTO extends BaseStreamSink { @ApiModelProperty("Kudu masters, a comma separated list of 'host:port' pairs") private String masters; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTO.java index 5b5750a803..e9cac1f77a 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTO.java @@ -22,6 +22,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.pojo.sink.BaseStreamSink; import org.apache.inlong.manager.pojo.util.MySQLSensitiveUrlUtils; import com.google.common.base.Strings; @@ -48,7 +49,7 @@ import java.util.regex.Pattern; @Builder @NoArgsConstructor @AllArgsConstructor -public class MySQLSinkDTO { +public class MySQLSinkDTO extends BaseStreamSink { private static final Logger LOGGER = LoggerFactory.getLogger(MySQLSinkDTO.class); private static final String MYSQL_JDBC_PREFIX = "jdbc:mysql://"; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oracle/OracleSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oracle/OracleSinkDTO.java index b65f2b6937..06a066a680 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oracle/OracleSinkDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oracle/OracleSinkDTO.java @@ -21,6 +21,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.pojo.sink.BaseStreamSink; import io.swagger.annotations.ApiModelProperty; import lombok.AllArgsConstructor; @@ -43,7 +44,7 @@ import java.util.Map; @Builder @NoArgsConstructor @AllArgsConstructor -public class OracleSinkDTO { +public class OracleSinkDTO extends BaseStreamSink { private static final Logger LOGGER = LoggerFactory.getLogger(OracleSinkDTO.class); diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/postgresql/PostgreSQLSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/postgresql/PostgreSQLSinkDTO.java index 18d02d75a7..2bf011f424 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/postgresql/PostgreSQLSinkDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/postgresql/PostgreSQLSinkDTO.java @@ -22,6 +22,7 @@ import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.AESUtils; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.pojo.sink.BaseStreamSink; import io.swagger.annotations.ApiModelProperty; import lombok.AllArgsConstructor; @@ -45,7 +46,7 @@ import java.util.Map; @Builder @NoArgsConstructor @AllArgsConstructor -public class PostgreSQLSinkDTO { +public class PostgreSQLSinkDTO extends BaseStreamSink { @ApiModelProperty("JDBC URL of the PostgreSQL server") @Length(max = 512, message = "length must be less than or equal to 512") diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/pulsar/PulsarSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/pulsar/PulsarSinkDTO.java index 1d80e91c8e..d693af80cd 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/pulsar/PulsarSinkDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/pulsar/PulsarSinkDTO.java @@ -21,6 +21,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.pojo.sink.BaseStreamSink; import io.swagger.annotations.ApiModelProperty; import lombok.AllArgsConstructor; @@ -38,7 +39,7 @@ import javax.validation.constraints.NotNull; @Builder @NoArgsConstructor @AllArgsConstructor -public class PulsarSinkDTO { +public class PulsarSinkDTO extends BaseStreamSink { @ApiModelProperty("Pulsar tenant") private String pulsarTenant; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisSinkDTO.java index 24c27529fe..c578006588 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisSinkDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisSinkDTO.java @@ -21,6 +21,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.pojo.sink.BaseStreamSink; import io.swagger.annotations.ApiModelProperty; import lombok.AllArgsConstructor; @@ -40,7 +41,7 @@ import java.util.Map; @Builder @NoArgsConstructor @AllArgsConstructor -public class RedisSinkDTO { +public class RedisSinkDTO extends BaseStreamSink { @ApiModelProperty("Redis cluster mode") private String clusterMode; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/sqlserver/SQLServerSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/sqlserver/SQLServerSinkDTO.java index c849f0e15d..93e55b1094 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/sqlserver/SQLServerSinkDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/sqlserver/SQLServerSinkDTO.java @@ -21,6 +21,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.pojo.sink.BaseStreamSink; import io.swagger.annotations.ApiModelProperty; import lombok.AllArgsConstructor; @@ -40,7 +41,7 @@ import java.util.List; @Builder @NoArgsConstructor @AllArgsConstructor -public class SQLServerSinkDTO { +public class SQLServerSinkDTO extends BaseStreamSink { @ApiModelProperty("Username of the SQLServer") private String username; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSinkDTO.java index 8f10c0b6a7..2e0edaab4d 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSinkDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSinkDTO.java @@ -22,6 +22,7 @@ import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.AESUtils; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.pojo.sink.BaseStreamSink; import io.swagger.annotations.ApiModelProperty; import lombok.AllArgsConstructor; @@ -44,7 +45,7 @@ import java.util.Objects; @Builder @NoArgsConstructor @AllArgsConstructor -public class StarRocksSinkDTO { +public class StarRocksSinkDTO extends BaseStreamSink { @ApiModelProperty("StarRocks jdbc url") private String jdbcUrl; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/tdsqlpostgresql/TDSQLPostgreSQLSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/tdsqlpostgresql/TDSQLPostgreSQLSinkDTO.java index 9431ae397f..3ef1b25fb4 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/tdsqlpostgresql/TDSQLPostgreSQLSinkDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/tdsqlpostgresql/TDSQLPostgreSQLSinkDTO.java @@ -21,6 +21,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.pojo.sink.BaseStreamSink; import io.swagger.annotations.ApiModelProperty; import lombok.AllArgsConstructor; @@ -40,7 +41,7 @@ import java.util.Map; @Builder @NoArgsConstructor @AllArgsConstructor -public class TDSQLPostgreSQLSinkDTO { +public class TDSQLPostgreSQLSinkDTO extends BaseStreamSink { @ApiModelProperty("TDSQLPostgreSQL jdbc url, such as jdbc:postgresql://host:port/database") private String jdbcUrl; diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/InLongTopic.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/InLongTopic.java index c90a710d07..3b636c912b 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/InLongTopic.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/InLongTopic.java @@ -27,8 +27,26 @@ public class InLongTopic { private int partitionId; // pulsar,kafka,tube private String topicType; + private String startConsumeTime; + private String stopConsumeTime; private Map<String, Object> properties; + public void setStopConsumeTime(String stopConsumeTime) { + this.stopConsumeTime = stopConsumeTime; + } + + public void setStartConsumeTime(String startConsumeTime) { + this.startConsumeTime = startConsumeTime; + } + + public String getStartConsumeTime() { + return startConsumeTime; + } + + public String getStopConsumeTime() { + return stopConsumeTime; + } + public String getTopic() { return topic; } diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/util/TimeUtil.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/util/TimeUtil.java index 1fc4ab2b36..f150ab15e5 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/util/TimeUtil.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/util/TimeUtil.java @@ -19,49 +19,40 @@ package org.apache.inlong.sdk.sort.util; import org.apache.inlong.sdk.sort.entity.InLongTopic; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; -import java.util.Optional; public class TimeUtil { private static final Logger logger = LoggerFactory.getLogger(TimeUtil.class); private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); - private static final String KEY_SDK_START_TIME = "sortSdk.startTime"; - private static final String KEY_SDK_STOP_TIME = "sortSdk.stopTime"; private static final long DEFAULT_START_TIME = -1L; private static final long DEFAULT_STOP_TIME = Long.MAX_VALUE; public static long parseStartTime(InLongTopic inLongTopic) { - return Optional.ofNullable(inLongTopic.getProperties().get(KEY_SDK_START_TIME)) - .map(s -> { - try { - LocalDateTime time = LocalDateTime.parse(s.toString(), DATE_FORMAT); - return LocalDateTime.from(time).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); - } catch (Throwable t) { - logger.error("parse start time failed, plz check the format of start time : {}", s, t); - } - return DEFAULT_START_TIME; - }) - .orElse(DEFAULT_START_TIME); + return parseTime(inLongTopic.getStartConsumeTime(), DEFAULT_START_TIME); } public static long parseStopTime(InLongTopic inLongTopic) { - return Optional.ofNullable(inLongTopic.getProperties().get(KEY_SDK_STOP_TIME)) - .map(s -> { - try { - LocalDateTime time = LocalDateTime.parse(s.toString(), DATE_FORMAT); - return LocalDateTime.from(time).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); - } catch (Throwable t) { - logger.error("parse start time failed, plz check the format of stop time : {}", s, t); - } - return DEFAULT_STOP_TIME; - }) - .orElse(DEFAULT_STOP_TIME); + return parseTime(inLongTopic.getStopConsumeTime(), DEFAULT_STOP_TIME); + } + + private static long parseTime(String time, long defaultValue) { + if (StringUtils.isEmpty(time)) { + return defaultValue; + } + try { + LocalDateTime localDateTime = LocalDateTime.parse(time, DATE_FORMAT); + return LocalDateTime.from(localDateTime).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); + } catch (Throwable t) { + logger.error("parse start time failed, plz check the format of time : {}", time, t); + } + return defaultValue; } } diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortConfigQueryConsumeConfig.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortConfigQueryConsumeConfig.java index 57f5059c16..048bace958 100644 --- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortConfigQueryConsumeConfig.java +++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortConfigQueryConsumeConfig.java @@ -64,6 +64,8 @@ public class SortConfigQueryConsumeConfig implements QueryConsumeConfig { topic.setTopic(flow.getSourceConfig().getTopic()); // only supports pulsar now topic.setTopicType(InlongTopicTypeEnum.PULSAR.getName()); + topic.setStartConsumeTime(flow.getSourceConfig().getStartConsumeTime()); + topic.setStopConsumeTime(flow.getSourceConfig().getStopConsumeTime()); topic.setProperties(flow.getProperties() != null ? flow.getProperties() : new HashMap<>()); topics.add(topic); }