This is an automated email from the ASF dual-hosted git repository.

wakefu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 91a0cb3aa4 [INLONG-11508][Manager] Add APIs to dirty data query 
(#11509)
91a0cb3aa4 is described below

commit 91a0cb3aa42fc855f3bd8c91cf0dc12ab54c5e02
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Tue Nov 19 18:32:25 2024 +0800

    [INLONG-11508][Manager] Add APIs to dirty data query (#11509)
    
    * [INLONG-11508][Manager] Add APIs to dirty data query
---
 .../manager/dao/entity/DirtyQueryLogEntity.java}   |  27 ++-
 .../dao/mapper/DirtyQueryLogEntityMapper.java}     |  25 ++-
 .../mappers/DirtyQueryLogEntityMapper.xml          |  93 +++++++++++
 .../inlong/manager/pojo/sink/BaseStreamSink.java   |  17 ++
 ...treamSink.java => DirtyDataDetailResponse.java} |  46 +++--
 .../{BaseStreamSink.java => DirtyDataRequest.java} |  38 +++--
 .../manager/pojo/sink/DirtyDataResponse.java}      |  21 ++-
 .../pojo/sink/DirtyDataTrendDetailResponse.java}   |  24 ++-
 ...eStreamSink.java => DirtyDataTrendRequest.java} |  35 ++--
 .../inlong/manager/pojo/sink/SinkRequest.java      |   3 +
 .../inlong/manager/pojo/sink/StreamSink.java       |   3 +
 .../service/dirtyData/DirtyQueryLogService.java}   |  36 ++--
 .../dirtyData/impl/DirtyQueryLogServiceImpl.java   | 185 +++++++++++++++++++++
 .../main/resources/h2/apache_inlong_manager.sql    |  18 ++
 .../manager-web/sql/apache_inlong_manager.sql      |  19 +++
 inlong-manager/manager-web/sql/changes-2.1.0.sql   |  16 ++
 .../web/controller/StreamSinkController.java       |  38 +++++
 .../src/main/resources/application-dev.properties  |   7 +-
 .../src/main/resources/application-prod.properties |   7 +-
 .../src/main/resources/application-test.properties |   7 +-
 20 files changed, 577 insertions(+), 88 deletions(-)

diff --git a/inlong-manager/manager-web/sql/changes-2.1.0.sql 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/DirtyQueryLogEntity.java
similarity index 60%
copy from inlong-manager/manager-web/sql/changes-2.1.0.sql
copy to 
inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/DirtyQueryLogEntity.java
index 0d4c984778..3554284f34 100644
--- a/inlong-manager/manager-web/sql/changes-2.1.0.sql
+++ 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/DirtyQueryLogEntity.java
@@ -15,13 +15,26 @@
  * limitations under the License.
  */
 
--- This is the SQL change file from version 1.4.0 to the current version 1.5.0.
--- When upgrading to version 1.5.0, please execute those SQLs in the DB (such 
as MySQL) used by the Manager module.
+package org.apache.inlong.manager.dao.entity;
 
-SET NAMES utf8mb4;
-SET FOREIGN_KEY_CHECKS = 0;
+import lombok.Data;
 
-USE `apache_inlong_manager`;
+import java.io.Serializable;
+import java.util.Date;
 
-ALTER TABLE `schedule_config`
-    ADD COLUMN  `schedule_engine` varchar(64)  NOT NULL DEFAULT 'Quartz' 
COMMENT 'Schedule engine, support Quartz, Airflow and DolphinScheduler';
+@Data
+public class DirtyQueryLogEntity implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private Integer id;
+    private String md5;
+    private String requestParams;
+    private String taskId;
+    private Integer isDeleted;
+    private String creator;
+    private String modifier;
+    private Date createTime;
+    private Date modifyTime;
+    private Integer version;
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-web/sql/changes-2.1.0.sql 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DirtyQueryLogEntityMapper.java
similarity index 59%
copy from inlong-manager/manager-web/sql/changes-2.1.0.sql
copy to 
inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DirtyQueryLogEntityMapper.java
index 0d4c984778..08db57441a 100644
--- a/inlong-manager/manager-web/sql/changes-2.1.0.sql
+++ 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DirtyQueryLogEntityMapper.java
@@ -15,13 +15,24 @@
  * limitations under the License.
  */
 
--- This is the SQL change file from version 1.4.0 to the current version 1.5.0.
--- When upgrading to version 1.5.0, please execute those SQLs in the DB (such 
as MySQL) used by the Manager module.
+package org.apache.inlong.manager.dao.mapper;
 
-SET NAMES utf8mb4;
-SET FOREIGN_KEY_CHECKS = 0;
+import org.apache.inlong.manager.dao.entity.DirtyQueryLogEntity;
 
-USE `apache_inlong_manager`;
+import org.apache.ibatis.annotations.Param;
+import org.springframework.stereotype.Repository;
 
-ALTER TABLE `schedule_config`
-    ADD COLUMN  `schedule_engine` varchar(64)  NOT NULL DEFAULT 'Quartz' 
COMMENT 'Schedule engine, support Quartz, Airflow and DolphinScheduler';
+@Repository
+public interface DirtyQueryLogEntityMapper {
+
+    int updateByIdSelective(DirtyQueryLogEntity record);
+
+    int insert(DirtyQueryLogEntity record);
+
+    DirtyQueryLogEntity selectByPrimaryKey(Integer id);
+
+    DirtyQueryLogEntity selectByMd5(String md5);
+
+    void updateToTimeout(@Param("beforeMinutes") Integer beforeMinutes);
+
+}
\ No newline at end of file
diff --git 
a/inlong-manager/manager-dao/src/main/resources/mappers/DirtyQueryLogEntityMapper.xml
 
b/inlong-manager/manager-dao/src/main/resources/mappers/DirtyQueryLogEntityMapper.xml
new file mode 100644
index 0000000000..0fb1ce8e5f
--- /dev/null
+++ 
b/inlong-manager/manager-dao/src/main/resources/mappers/DirtyQueryLogEntityMapper.xml
@@ -0,0 +1,93 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements. See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership. The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License. You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied. See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" 
"http://mybatis.org/dtd/mybatis-3-mapper.dtd";>
+<mapper 
namespace="org.apache.inlong.manager.dao.mapper.DirtyQueryLogEntityMapper">
+    <resultMap id="BaseResultMap" 
type="org.apache.inlong.manager.dao.entity.DirtyQueryLogEntity">
+        <id column="id" jdbcType="INTEGER" property="id" />
+        <result column="md5" jdbcType="VARCHAR" property="md5" />
+        <result column="request_params" jdbcType="VARCHAR" 
property="requestParams" />
+        <result column="task_id" jdbcType="VARCHAR" property="taskId" />
+        <result column="is_deleted" jdbcType="INTEGER" property="isDeleted" />
+        <result column="creator" jdbcType="VARCHAR" property="creator" />
+        <result column="modifier" jdbcType="VARCHAR" property="modifier" />
+        <result column="create_time" jdbcType="TIMESTAMP" 
property="createTime" />
+        <result column="modify_time" jdbcType="TIMESTAMP" 
property="modifyTime" />
+        <result column="version" jdbcType="INTEGER" property="version" />
+    </resultMap>
+
+    <sql id="Base_Column_List">
+        id, md5, request_params, task_id, is_deleted, creator, modifier, 
create_time, modify_time, version
+    </sql>
+    <select id="selectByPrimaryKey" parameterType="java.lang.Integer" 
resultMap="BaseResultMap">
+        select
+        <include refid="Base_Column_List" />
+        from dirty_query_log
+        where is_deleted = 0
+        and id = #{id,jdbcType=INTEGER}
+    </select>
+    <select id="selectByMd5" parameterType="java.lang.String" 
resultMap="BaseResultMap">
+        select
+        <include refid="Base_Column_List" />
+        from dirty_query_log
+        where is_deleted = 0
+        and md5 = #{md5,jdbcType=VARCHAR}
+        limit 1
+    </select>
+    <insert id="insert" 
parameterType="org.apache.inlong.manager.dao.entity.DirtyQueryLogEntity">
+        insert into dirty_query_log (id, md5, request_params,
+                                     task_id, creator, modifier)
+        values (#{id,jdbcType=INTEGER}, #{md5,jdbcType=VARCHAR}, 
#{requestParams,jdbcType=VARCHAR},
+                #{taskId,jdbcType=VARCHAR}, #{creator,jdbcType=VARCHAR}, 
#{modifier,jdbcType=VARCHAR})
+    </insert>
+    <update id="updateByIdSelective" 
parameterType="org.apache.inlong.manager.dao.entity.DirtyQueryLogEntity">
+        update dirty_query_log
+        <set>
+            <if test="md5 != null">
+                md5 = #{md5,jdbcType=VARCHAR},
+            </if>
+            <if test="requestParams != null">
+                request_params = #{requestParams,jdbcType=VARCHAR},
+            </if>
+            <if test="taskId != null">
+                task_id = #{taskId,jdbcType=VARCHAR},
+            </if>
+            <if test="isDeleted != null">
+                is_deleted = #{isDeleted,jdbcType=INTEGER},
+            </if>
+            <if test="modifier != null">
+                modifier = #{modifier,jdbcType=VARCHAR},
+            </if>
+            version = #{version,jdbcType=INTEGER} + 1
+        </set>
+        where id = #{id,jdbcType=INTEGER}
+        and version = #{version,jdbcType=INTEGER}
+    </update>
+    <update id="updateToTimeout">
+        update dirty_query_log
+        <set>
+            is_deleted = id
+        </set>
+        <where>
+            is_deleted = 0
+            and create_time &lt;= DATE_ADD(NOW(), INTERVAL -#{beforeMinutes, 
jdbcType=INTEGER} MINUTE)
+        </where>
+    </update>
+</mapper>
\ No newline at end of file
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
index d8df7f4a28..6ad6c9b523 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
@@ -17,12 +17,18 @@
 
 package org.apache.inlong.manager.pojo.sink;
 
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
+import javax.validation.constraints.NotNull;
+
 /**
  * The base parameter class of StreamSink, support user extend their own 
business params.
  */
@@ -32,6 +38,9 @@ import lombok.NoArgsConstructor;
 @ApiModel("Base info of stream sink")
 public class BaseStreamSink {
 
+    @ApiModelProperty("Enable data archiving")
+    private Boolean enableDataArchiving;
+
     @ApiModelProperty("Transform sql")
     private String transformSql;
 
@@ -40,4 +49,12 @@ public class BaseStreamSink {
 
     @ApiModelProperty("Stop consume time, yyyy-MM-dd HH:mm:ss format")
     private String stopConsumeTime;
+
+    public static BaseStreamSink getFromJson(@NotNull String extParams) {
+        try {
+            return JsonUtils.parseObject(extParams, BaseStreamSink.class);
+        } catch (Exception e) {
+            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
+        }
+    }
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/DirtyDataDetailResponse.java
similarity index 50%
copy from 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
copy to 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/DirtyDataDetailResponse.java
index d8df7f4a28..2389217a40 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/DirtyDataDetailResponse.java
@@ -19,25 +19,45 @@ package org.apache.inlong.manager.pojo.sink;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
-import lombok.AllArgsConstructor;
 import lombok.Data;
-import lombok.NoArgsConstructor;
 
 /**
- * The base parameter class of StreamSink, support user extend their own 
business params.
+ * Dirty data detail info.
  */
 @Data
-@AllArgsConstructor
-@NoArgsConstructor
-@ApiModel("Base info of stream sink")
-public class BaseStreamSink {
+@ApiModel("Dirty data detail info")
+public class DirtyDataDetailResponse {
 
-    @ApiModelProperty("Transform sql")
-    private String transformSql;
+    @ApiModelProperty(value = "Tdbank imp date")
+    private String tdbankImpDate;
 
-    @ApiModelProperty("Start consume time, yyyy-MM-dd HH:mm:ss format")
-    private String startConsumeTime;
+    @ApiModelProperty(value = "Data flow id")
+    private String dataFlowId;
 
-    @ApiModelProperty("Stop consume time, yyyy-MM-dd HH:mm:ss format")
-    private String stopConsumeTime;
+    @ApiModelProperty(value = "Inlong group id")
+    private String groupId;
+
+    @ApiModelProperty(value = "Inlong stream id")
+    private String streamId;
+
+    @ApiModelProperty(value = "Report time")
+    private String reportTime;
+
+    @ApiModelProperty(value = "Data time")
+    private String dataTime;
+
+    @ApiModelProperty(value = "Server type")
+    private String serverType;
+
+    @ApiModelProperty(value = "Dirty type")
+    private String dirtyType;
+
+    @ApiModelProperty(value = "Dirty message")
+    private String dirtyMessage;
+
+    @ApiModelProperty(value = "Ext info")
+    private String extInfo;
+
+    @ApiModelProperty(value = "Dirty data")
+    private String dirtyData;
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/DirtyDataRequest.java
similarity index 56%
copy from 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
copy to 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/DirtyDataRequest.java
index d8df7f4a28..8b3fc32a6f 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/DirtyDataRequest.java
@@ -19,25 +19,37 @@ package org.apache.inlong.manager.pojo.sink;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
-import lombok.AllArgsConstructor;
 import lombok.Data;
-import lombok.NoArgsConstructor;
+import lombok.EqualsAndHashCode;
+
+import java.util.List;
 
 /**
- * The base parameter class of StreamSink, support user extend their own 
business params.
+ * Query request for Dirty data
  */
 @Data
-@AllArgsConstructor
-@NoArgsConstructor
-@ApiModel("Base info of stream sink")
-public class BaseStreamSink {
+@EqualsAndHashCode(callSuper = false)
+@ApiModel("Query request for Dirty data")
+public class DirtyDataRequest {
+
+    @ApiModelProperty(value = "Sink id list")
+    private List<Integer> sinkIdList;
+
+    @ApiModelProperty(value = "Key word")
+    private String keyword;
+
+    @ApiModelProperty(value = "Server type")
+    private String serverType;
+
+    @ApiModelProperty(value = "Dirty type")
+    private String dirtyType;
 
-    @ApiModelProperty("Transform sql")
-    private String transformSql;
+    @ApiModelProperty(value = "Start time")
+    private String startTime;
 
-    @ApiModelProperty("Start consume time, yyyy-MM-dd HH:mm:ss format")
-    private String startConsumeTime;
+    @ApiModelProperty(value = "End time")
+    private String endTime;
 
-    @ApiModelProperty("Stop consume time, yyyy-MM-dd HH:mm:ss format")
-    private String stopConsumeTime;
+    @ApiModelProperty(value = "Data count")
+    private Integer dataCount = 10;
 }
diff --git a/inlong-manager/manager-web/sql/changes-2.1.0.sql 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/DirtyDataResponse.java
similarity index 64%
copy from inlong-manager/manager-web/sql/changes-2.1.0.sql
copy to 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/DirtyDataResponse.java
index 0d4c984778..08203db09a 100644
--- a/inlong-manager/manager-web/sql/changes-2.1.0.sql
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/DirtyDataResponse.java
@@ -15,13 +15,20 @@
  * limitations under the License.
  */
 
--- This is the SQL change file from version 1.4.0 to the current version 1.5.0.
--- When upgrading to version 1.5.0, please execute those SQLs in the DB (such 
as MySQL) used by the Manager module.
+package org.apache.inlong.manager.pojo.sink;
 
-SET NAMES utf8mb4;
-SET FOREIGN_KEY_CHECKS = 0;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
 
-USE `apache_inlong_manager`;
+/**
+ * Dirty data info.
+ */
+@Data
+@ApiModel("Dirty data info")
+public class DirtyDataResponse {
+
+    @ApiModelProperty(value = "Task id")
+    private String taskId;
 
-ALTER TABLE `schedule_config`
-    ADD COLUMN  `schedule_engine` varchar(64)  NOT NULL DEFAULT 'Quartz' 
COMMENT 'Schedule engine, support Quartz, Airflow and DolphinScheduler';
+}
diff --git a/inlong-manager/manager-web/sql/changes-2.1.0.sql 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/DirtyDataTrendDetailResponse.java
similarity index 64%
copy from inlong-manager/manager-web/sql/changes-2.1.0.sql
copy to 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/DirtyDataTrendDetailResponse.java
index 0d4c984778..cc56cd93ee 100644
--- a/inlong-manager/manager-web/sql/changes-2.1.0.sql
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/DirtyDataTrendDetailResponse.java
@@ -15,13 +15,23 @@
  * limitations under the License.
  */
 
--- This is the SQL change file from version 1.4.0 to the current version 1.5.0.
--- When upgrading to version 1.5.0, please execute those SQLs in the DB (such 
as MySQL) used by the Manager module.
+package org.apache.inlong.manager.pojo.sink;
 
-SET NAMES utf8mb4;
-SET FOREIGN_KEY_CHECKS = 0;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
 
-USE `apache_inlong_manager`;
+/**
+ * Dirty data detail info.
+ */
+@Data
+@ApiModel("Dirty data trend detail info")
+public class DirtyDataTrendDetailResponse {
+
+    @ApiModelProperty(value = "Report time")
+    private String reportTime;
+
+    @ApiModelProperty(value = "Data count")
+    private String count;
 
-ALTER TABLE `schedule_config`
-    ADD COLUMN  `schedule_engine` varchar(64)  NOT NULL DEFAULT 'Quartz' 
COMMENT 'Schedule engine, support Quartz, Airflow and DolphinScheduler';
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/DirtyDataTrendRequest.java
similarity index 59%
copy from 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
copy to 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/DirtyDataTrendRequest.java
index d8df7f4a28..b0e52420a2 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/DirtyDataTrendRequest.java
@@ -19,25 +19,34 @@ package org.apache.inlong.manager.pojo.sink;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
-import lombok.AllArgsConstructor;
 import lombok.Data;
-import lombok.NoArgsConstructor;
+import lombok.EqualsAndHashCode;
+
+import java.util.List;
 
 /**
- * The base parameter class of StreamSink, support user extend their own 
business params.
+ * Query request for Dirty data
  */
 @Data
-@AllArgsConstructor
-@NoArgsConstructor
-@ApiModel("Base info of stream sink")
-public class BaseStreamSink {
+@EqualsAndHashCode(callSuper = false)
+@ApiModel("Query request for Dirty data")
+public class DirtyDataTrendRequest {
+
+    @ApiModelProperty(value = "Sink id list")
+    private List<Integer> sinkIdList;
+
+    @ApiModelProperty(value = "Data time unit")
+    private String dataTimeUnit;
+
+    @ApiModelProperty(value = "Server type")
+    private String serverType;
 
-    @ApiModelProperty("Transform sql")
-    private String transformSql;
+    @ApiModelProperty(value = "Dirty type")
+    private String dirtyType;
 
-    @ApiModelProperty("Start consume time, yyyy-MM-dd HH:mm:ss format")
-    private String startConsumeTime;
+    @ApiModelProperty(value = "Start time")
+    private String startTime;
 
-    @ApiModelProperty("Stop consume time, yyyy-MM-dd HH:mm:ss format")
-    private String stopConsumeTime;
+    @ApiModelProperty(value = "End time")
+    private String endTime;
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java
index 24c544b943..782817a50d 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java
@@ -100,6 +100,9 @@ public abstract class SinkRequest {
     @Range(min = 0, max = 1, message = "default is 1, only supports [0: 
disable, 1: enable]")
     private Integer enableCreateResource = 1;
 
+    @ApiModelProperty("Enable data archiving")
+    private Boolean enableDataArchiving;
+
     @ApiModelProperty(value = "Whether to start the process after saving or 
updating. Default is false")
     private Boolean startProcess = false;
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/StreamSink.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/StreamSink.java
index 85fd72a1a4..79a5af3876 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/StreamSink.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/StreamSink.java
@@ -90,6 +90,9 @@ public abstract class StreamSink extends StreamNode {
     @ApiModelProperty(value = "Whether to enable create sink resource? 0: 
disable, 1: enable. Default is 1", notes = "Such as enable or disable to create 
Hive table")
     private Integer enableCreateResource = 1;
 
+    @ApiModelProperty("Enable data archiving")
+    private Boolean enableDataArchiving;
+
     @ApiModelProperty("Backend operation log")
     private String operateLog;
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/dirtyData/DirtyQueryLogService.java
similarity index 50%
copy from 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
copy to 
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/dirtyData/DirtyQueryLogService.java
index d8df7f4a28..3629ee985c 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/dirtyData/DirtyQueryLogService.java
@@ -15,29 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.pojo.sink;
+package org.apache.inlong.manager.service.dirtyData;
 
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.pojo.sink.DirtyDataDetailResponse;
+import org.apache.inlong.manager.pojo.sink.DirtyDataRequest;
+import org.apache.inlong.manager.pojo.sink.DirtyDataResponse;
+import org.apache.inlong.manager.pojo.sink.DirtyDataTrendDetailResponse;
+import org.apache.inlong.manager.pojo.sink.DirtyDataTrendRequest;
+
+import java.util.List;
 
 /**
- * The base parameter class of StreamSink, support user extend their own 
business params.
+ * Dirty query log service
  */
-@Data
-@AllArgsConstructor
-@NoArgsConstructor
-@ApiModel("Base info of stream sink")
-public class BaseStreamSink {
+public interface DirtyQueryLogService {
+
+    DirtyDataResponse listDirtyData(DirtyDataRequest request);
+
+    DirtyDataResponse listDirtyDataTrend(DirtyDataTrendRequest request);
+
+    List<DirtyDataDetailResponse> getDirtyData(String taskId);
 
-    @ApiModelProperty("Transform sql")
-    private String transformSql;
+    List<DirtyDataTrendDetailResponse> getDirtyDataTrend(String taskId);
 
-    @ApiModelProperty("Start consume time, yyyy-MM-dd HH:mm:ss format")
-    private String startConsumeTime;
+    String getSqlTaskStatus(String taskId);
 
-    @ApiModelProperty("Stop consume time, yyyy-MM-dd HH:mm:ss format")
-    private String stopConsumeTime;
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/dirtyData/impl/DirtyQueryLogServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/dirtyData/impl/DirtyQueryLogServiceImpl.java
new file mode 100644
index 0000000000..a63a320765
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/dirtyData/impl/DirtyQueryLogServiceImpl.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.dirtyData.impl;
+
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.dao.entity.DirtyQueryLogEntity;
+import org.apache.inlong.manager.dao.mapper.DirtyQueryLogEntityMapper;
+import org.apache.inlong.manager.pojo.sink.DirtyDataDetailResponse;
+import org.apache.inlong.manager.pojo.sink.DirtyDataRequest;
+import org.apache.inlong.manager.pojo.sink.DirtyDataResponse;
+import org.apache.inlong.manager.pojo.sink.DirtyDataTrendDetailResponse;
+import org.apache.inlong.manager.pojo.sink.DirtyDataTrendRequest;
+import org.apache.inlong.manager.pojo.user.LoginUserUtils;
+import org.apache.inlong.manager.service.dirtyData.DirtyQueryLogService;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+@Service
+@Slf4j
+public class DirtyQueryLogServiceImpl implements DirtyQueryLogService {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(DirtyQueryLogServiceImpl.class);
+
+    @Autowired
+    private DirtyQueryLogEntityMapper dirtyQueryLogEntityMapper;
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    @Value("${dirty.log.clean.enabled:false}")
+    private Boolean dirtyLogCleanEnabled;
+    @Value("${dirty.log.clean.interval.minutes:5}")
+    private Integer dirtyLogCleanInterval;
+    @Value("${dirty.dirty.retention.minutes:10}")
+    private Integer retentionMinutes;
+    @Value("${dirty.dirty.db.table:inlong_iceberg::dirty_data_achive_iceberg}")
+    private String dirtyDataDbTable;
+
+    @PostConstruct
+    private void startDirtyLogCleanTask() {
+        if (dirtyLogCleanEnabled) {
+            ThreadFactory factory = new ThreadFactoryBuilder()
+                    .setNameFormat("scheduled-dirtyQueryLog-deleted-%d")
+                    .setDaemon(true)
+                    .build();
+            ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor(factory);
+            executor.scheduleWithFixedDelay(() -> {
+                try {
+                    LOGGER.info("begin to clean dirty query log");
+                    
dirtyQueryLogEntityMapper.updateToTimeout(retentionMinutes);
+                    LOGGER.info("success to clean dirty query log 
successfully");
+                } catch (Throwable t) {
+                    LOGGER.error("clean dirty query log error", t);
+                }
+            }, 0, dirtyLogCleanInterval, TimeUnit.MINUTES);
+            LOGGER.info("clean dirty query log started successfully");
+        }
+    }
+
+    @Override
+    public DirtyDataResponse listDirtyDataTrend(DirtyDataTrendRequest request) 
{
+        if (CollectionUtils.isEmpty(request.getSinkIdList())) {
+            return null;
+        }
+        try {
+            DirtyDataResponse dirtyDataResponse = new DirtyDataResponse();
+            String requestStr = objectMapper.writeValueAsString(request);
+            String md5 = DigestUtils.md5Hex(requestStr);
+            DirtyQueryLogEntity dirtyQueryLogEntity = 
dirtyQueryLogEntityMapper.selectByMd5(md5);
+            if (dirtyQueryLogEntity != null) {
+                LOGGER.info("dirty query log is exist");
+                dirtyDataResponse.setTaskId(dirtyQueryLogEntity.getTaskId());
+                return dirtyDataResponse;
+            }
+            DirtyQueryLogEntity dirtyQueryLog = new DirtyQueryLogEntity();
+            // TODO dirtyQueryLog.setTaskId();
+            dirtyQueryLog.setMd5(md5);
+            dirtyQueryLog.setRequestParams(requestStr);
+            dirtyQueryLog.setCreator(LoginUserUtils.getLoginUser().getName());
+            dirtyQueryLog.setModifier(LoginUserUtils.getLoginUser().getName());
+            dirtyQueryLogEntityMapper.insert(dirtyQueryLog);
+
+            return dirtyDataResponse;
+        } catch (Exception e) {
+            throw new BusinessException("list dirty data trend failed");
+        }
+    }
+
+    @Override
+    public DirtyDataResponse listDirtyData(DirtyDataRequest request) {
+        if (CollectionUtils.isEmpty(request.getSinkIdList())) {
+            return null;
+        }
+        if (request.getDataCount() == null) {
+            request.setDataCount(10);
+        }
+        try {
+            DirtyDataResponse dirtyDataResponse = new DirtyDataResponse();
+            String requestStr = objectMapper.writeValueAsString(request);
+            String md5 = DigestUtils.md5Hex(requestStr);
+            DirtyQueryLogEntity dirtyQueryLogEntity = 
dirtyQueryLogEntityMapper.selectByMd5(md5);
+            if (dirtyQueryLogEntity != null) {
+                LOGGER.info("dirty query log is exist");
+                dirtyDataResponse.setTaskId(dirtyQueryLogEntity.getTaskId());
+                return dirtyDataResponse;
+            }
+            DirtyQueryLogEntity dirtyQueryLog = new DirtyQueryLogEntity();
+            // TODO dirtyQueryLog.setTaskId();
+            dirtyQueryLog.setMd5(md5);
+            dirtyQueryLog.setRequestParams(requestStr);
+            dirtyQueryLog.setCreator(LoginUserUtils.getLoginUser().getName());
+            dirtyQueryLog.setModifier(LoginUserUtils.getLoginUser().getName());
+            dirtyQueryLogEntityMapper.insert(dirtyQueryLog);
+            return dirtyDataResponse;
+        } catch (Exception e) {
+            LOGGER.error("list dirty data failed", e);
+            throw new BusinessException("list dirty data failed");
+        }
+    }
+
+    @Override
+    public List<DirtyDataDetailResponse> getDirtyData(String taskId) {
+        try {
+            // TODO
+            return new ArrayList<>();
+        } catch (Exception e) {
+            LOGGER.error("get dirty data failed", e);
+            throw new BusinessException("get dirty data failed");
+        }
+    }
+
+    @Override
+    public List<DirtyDataTrendDetailResponse> getDirtyDataTrend(String taskId) 
{
+        try {
+            // TODO
+            return new ArrayList<>();
+        } catch (Exception e) {
+            LOGGER.error("get dirty data trend failed", e);
+            throw new BusinessException("get dirty data trend failed");
+        }
+    }
+
+    @Override
+    public String getSqlTaskStatus(String taskId) {
+        try {
+            // TODO
+            return "success";
+        } catch (Exception e) {
+            LOGGER.error("get sql task status failed", e);
+            throw new BusinessException("get get sql task status failed");
+        }
+    }
+}
diff --git 
a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql 
b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index 1d92f14f46..6836d44d5b 100644
--- 
a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++ 
b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -1013,6 +1013,24 @@ CREATE TABLE IF NOT EXISTS `schedule_config`
     UNIQUE KEY `unique_group_schedule_config` (`inlong_group_id`, `is_deleted`)
     ) ENGINE = InnoDB
     DEFAULT CHARSET = utf8mb4 COMMENT = 'schedule_config';
+
+-- ----------------------------
+-- Table structure for dirty_query_log
+-- ----------------------------
+CREATE TABLE IF NOT EXISTS `dirty_query_log`
+(
+    `id`                int(11)      NOT NULL AUTO_INCREMENT COMMENT 
'Incremental primary key',
+    `md5`               varchar(256) NOT NULL COMMENT 'Md5 for request params',
+    `request_params`    mediumtext   DEFAULT NULL COMMENT 'Request params, 
will be saved as JSON string',
+    `task_id`           varchar(256) DEFAULT ''    COMMENT 'Task id',
+    `is_deleted`        int(11)      DEFAULT '0' COMMENT 'Whether to delete, 
0: not deleted, > 0: deleted',
+    `creator`           varchar(64)  NOT NULL COMMENT 'Creator name',
+    `modifier`          varchar(64)  DEFAULT NULL COMMENT 'Modifier name',
+    `create_time`       timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP 
COMMENT 'Create time',
+    `modify_time`       timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON 
UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `version`           int(11)      NOT NULL DEFAULT '1' COMMENT 'Version 
number, which will be incremented by 1 after modification',
+    PRIMARY KEY (`id`)
+);
 -- ----------------------------
 
 
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql 
b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 58259c9047..51a72d6039 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -1066,6 +1066,25 @@ CREATE TABLE IF NOT EXISTS `schedule_config`
     UNIQUE KEY `unique_group_schedule_config` (`inlong_group_id`, `is_deleted`)
 ) ENGINE = InnoDB
     DEFAULT CHARSET = utf8mb4 COMMENT = 'schedule_config';
+
+-- ----------------------------
+-- Table structure for dirty_query_log
+-- ----------------------------
+CREATE TABLE IF NOT EXISTS `dirty_query_log`
+(
+    `id`                int(11)      NOT NULL AUTO_INCREMENT COMMENT 
'Incremental primary key',
+    `md5`               varchar(256) NOT NULL COMMENT 'Md5 for request params',
+    `request_params`    mediumtext   DEFAULT NULL COMMENT 'Request params, 
will be saved as JSON string',
+    `task_id`           varchar(256) DEFAULT ''    COMMENT 'Task id',
+    `is_deleted`        int(11)      DEFAULT '0' COMMENT 'Whether to delete, 
0: not deleted, > 0: deleted',
+    `creator`           varchar(64)  NOT NULL COMMENT 'Creator name',
+    `modifier`          varchar(64)  DEFAULT NULL COMMENT 'Modifier name',
+    `create_time`       timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP 
COMMENT 'Create time',
+    `modify_time`       timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON 
UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `version`           int(11)      NOT NULL DEFAULT '1' COMMENT 'Version 
number, which will be incremented by 1 after modification',
+    PRIMARY KEY (`id`)
+) ENGINE = InnoDB
+    DEFAULT CHARSET = utf8mb4 COMMENT ='Dirty query log table';
 -- ----------------------------
 
 SET FOREIGN_KEY_CHECKS = 1;
diff --git a/inlong-manager/manager-web/sql/changes-2.1.0.sql 
b/inlong-manager/manager-web/sql/changes-2.1.0.sql
index 0d4c984778..2fa48f95b4 100644
--- a/inlong-manager/manager-web/sql/changes-2.1.0.sql
+++ b/inlong-manager/manager-web/sql/changes-2.1.0.sql
@@ -25,3 +25,19 @@ USE `apache_inlong_manager`;
 
 ALTER TABLE `schedule_config`
     ADD COLUMN  `schedule_engine` varchar(64)  NOT NULL DEFAULT 'Quartz' 
COMMENT 'Schedule engine, support Quartz, Airflow and DolphinScheduler';
+
+CREATE TABLE IF NOT EXISTS `dirty_query_log`
+(
+    `id`                int(11)      NOT NULL AUTO_INCREMENT COMMENT 
'Incremental primary key',
+    `md5`               varchar(256) NOT NULL COMMENT 'Md5 for request params',
+    `request_params`    mediumtext   DEFAULT NULL COMMENT 'Request params, 
will be saved as JSON string',
+    `task_id`           varchar(256) DEFAULT ''    COMMENT 'Task id',
+    `is_deleted`        int(11)      DEFAULT '0' COMMENT 'Whether to delete, 
0: not deleted, > 0: deleted',
+    `creator`           varchar(64)  NOT NULL COMMENT 'Creator name',
+    `modifier`          varchar(64)  DEFAULT NULL COMMENT 'Modifier name',
+    `create_time`       timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP 
COMMENT 'Create time',
+    `modify_time`       timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON 
UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `version`           int(11)      NOT NULL DEFAULT '1' COMMENT 'Version 
number, which will be incremented by 1 after modification',
+    PRIMARY KEY (`id`)
+) ENGINE = InnoDB
+    DEFAULT CHARSET = utf8mb4 COMMENT ='Dirty query log table';
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
index b7b8dbd5d4..331e3a5355 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
@@ -25,12 +25,18 @@ import org.apache.inlong.manager.pojo.common.BatchResult;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.common.Response;
 import org.apache.inlong.manager.pojo.common.UpdateResult;
+import org.apache.inlong.manager.pojo.sink.DirtyDataDetailResponse;
+import org.apache.inlong.manager.pojo.sink.DirtyDataRequest;
+import org.apache.inlong.manager.pojo.sink.DirtyDataResponse;
+import org.apache.inlong.manager.pojo.sink.DirtyDataTrendDetailResponse;
+import org.apache.inlong.manager.pojo.sink.DirtyDataTrendRequest;
 import org.apache.inlong.manager.pojo.sink.ParseFieldRequest;
 import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
 import org.apache.inlong.manager.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
 import org.apache.inlong.manager.pojo.user.LoginUserUtils;
+import org.apache.inlong.manager.service.dirtyData.DirtyQueryLogService;
 import org.apache.inlong.manager.service.operationlog.OperationLog;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
 
@@ -60,6 +66,8 @@ public class StreamSinkController {
 
     @Autowired
     private StreamSinkService sinkService;
+    @Autowired
+    private DirtyQueryLogService dirtyQueryLogService;
 
     @RequestMapping(value = "/sink/save", method = RequestMethod.POST)
     @OperationLog(operation = OperationType.CREATE, operationTarget = 
OperationTarget.SINK)
@@ -143,4 +151,34 @@ public class StreamSinkController {
         return Response.success(sinkService.parseFields(parseFieldRequest));
     }
 
+    @RequestMapping(value = "/sink/listDirtyData", method = RequestMethod.POST)
+    @ApiOperation(value = "List stream sinks by paginating")
+    public Response<DirtyDataResponse> listDirtyData(@RequestBody 
DirtyDataRequest request) {
+        return Response.success(dirtyQueryLogService.listDirtyData(request));
+    }
+
+    @RequestMapping(value = "/sink/listDirtyDataTrend", method = 
RequestMethod.POST)
+    @ApiOperation(value = "List stream sinks by paginating")
+    public Response<DirtyDataResponse> listDirtyDataTrend(@RequestBody 
DirtyDataTrendRequest request) {
+        return 
Response.success(dirtyQueryLogService.listDirtyDataTrend(request));
+    }
+
+    @RequestMapping(value = "/sink/getDirtyData/{taskId}", method = 
RequestMethod.GET)
+    @ApiImplicitParam(name = "taskId", dataTypeClass = String.class, required 
= true)
+    public Response<List<DirtyDataDetailResponse>> getDirtyData(@PathVariable 
String taskId) {
+        return Response.success(dirtyQueryLogService.getDirtyData(taskId));
+    }
+
+    @RequestMapping(value = "/sink/getDirtyDataTrend/{taskId}", method = 
RequestMethod.GET)
+    @ApiImplicitParam(name = "taskId", dataTypeClass = String.class, required 
= true)
+    public Response<List<DirtyDataTrendDetailResponse>> 
getDirtyDataTrend(@PathVariable String taskId) {
+        return 
Response.success(dirtyQueryLogService.getDirtyDataTrend(taskId));
+    }
+
+    @RequestMapping(value = "/sink/SqlTaskStatus/{taskId}", method = 
RequestMethod.GET)
+    @ApiImplicitParam(name = "taskId", dataTypeClass = String.class, required 
= true)
+    public Response<String> SqlTaskStatus(@PathVariable String taskId) {
+        return Response.success(dirtyQueryLogService.getSqlTaskStatus(taskId));
+    }
+
 }
diff --git 
a/inlong-manager/manager-web/src/main/resources/application-dev.properties 
b/inlong-manager/manager-web/src/main/resources/application-dev.properties
index 794f201bda..2bad5f801f 100644
--- a/inlong-manager/manager-web/src/main/resources/application-dev.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-dev.properties
@@ -105,4 +105,9 @@ agent.install.temp.path=inlong/agent-installer-temp/
 default.module.id=1
 # schedule engine type
 # support none(no scheduler) and quartz(quartz scheduler), default is none
-inlong.schedule.engine=none
\ No newline at end of file
+inlong.schedule.engine=none
+
+dirty.log.clean.enabled=false
+dirty.log.clean.interval.minutes=5
+dirty.dirty.retention.minutes=10
+dirty.dirty.db.table=inlong_iceberg::dirty_data_achive_iceberg
\ No newline at end of file
diff --git 
a/inlong-manager/manager-web/src/main/resources/application-prod.properties 
b/inlong-manager/manager-web/src/main/resources/application-prod.properties
index 3e8f329470..040c868bcf 100644
--- a/inlong-manager/manager-web/src/main/resources/application-prod.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-prod.properties
@@ -97,4 +97,9 @@ cls.manager.endpoint=127.0.0.1
 
 # schedule engine type
 # support none(no scheduler) and quartz(quartz scheduler), default is none
-inlong.schedule.engine=none
\ No newline at end of file
+inlong.schedule.engine=none
+
+dirty.log.clean.enabled=false
+dirty.log.clean.interval.minutes=5
+dirty.dirty.retention.minutes=10
+dirty.dirty.db.table=inlong_iceberg::dirty_data_achive_iceberg
\ No newline at end of file
diff --git 
a/inlong-manager/manager-web/src/main/resources/application-test.properties 
b/inlong-manager/manager-web/src/main/resources/application-test.properties
index 5ff929c2b8..393eef6b05 100644
--- a/inlong-manager/manager-web/src/main/resources/application-test.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-test.properties
@@ -98,4 +98,9 @@ cls.manager.endpoint=127.0.0.1
 
 # schedule engine type
 # support none(no scheduler) and quartz(quartz scheduler), default is none
-inlong.schedule.engine=none
\ No newline at end of file
+inlong.schedule.engine=none
+
+dirty.log.clean.enabled=false
+dirty.log.clean.interval.minutes=5
+dirty.dirty.retention.minutes=10
+dirty.dirty.db.table=inlong_iceberg::dirty_data_achive_iceberg
\ No newline at end of file


Reply via email to