This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 778cbe6050 [INLONG-11286][Audit] Optimize the statistics of daily 
Audit data (#11312)
778cbe6050 is described below

commit 778cbe6050ff4c60b7fd17dbeabfe614289cac7e
Author: doleyzi <43397300+dole...@users.noreply.github.com>
AuthorDate: Wed Oct 9 19:32:17 2024 +0800

    [INLONG-11286][Audit] Optimize the statistics of daily Audit data (#11312)
---
 .../inlong/audit/config/ConfigConstants.java       |   6 +-
 .../apache/inlong/audit/config/SqlConstants.java   |  12 +-
 .../inlong/audit/entities/PartitionEntity.java     |  71 ++++++++
 .../apache/inlong/audit/entities/SourceConfig.java |   4 +-
 .../org/apache/inlong/audit/main/Application.java  |   3 +
 .../apache/inlong/audit/service/EtlService.java    | 153 ++++++-----------
 .../inlong/audit/service/PartitionManager.java     | 186 +++++++++++++++++++++
 .../org/apache/inlong/audit/sink/AuditSink.java    |  24 +++
 .../org/apache/inlong/audit/sink/CacheSink.java    |   2 +-
 .../org/apache/inlong/audit/sink/JdbcSink.java     | 152 ++---------------
 .../org/apache/inlong/audit/source/JdbcSource.java |  37 +---
 .../org/apache/inlong/audit/utils/JdbcUtils.java   |  37 ++++
 inlong-audit/sql/apache_inlong_audit_mysql.sql     |   4 +-
 13 files changed, 413 insertions(+), 278 deletions(-)

diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java
index d5afb1306e..d74e162f78 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java
@@ -57,9 +57,6 @@ public class ConfigConstants {
     public static final int DEFAULT_SOURCE_DB_SINK_BATCH = 1000;
     public static final String KEY_CONFIG_UPDATE_INTERVAL_SECONDS = 
"config.update.interval.seconds";
     public static final int DEFAULT_CONFIG_UPDATE_INTERVAL_SECONDS = 60;
-
-    public static final String KEY_ENABLE_MANAGE_PARTITIONS = 
"enable.manage.partitions";
-    public static final boolean DEFAULT_ENABLE_MANAGE_PARTITIONS = true;
     public static final String KEY_CHECK_PARTITION_INTERVAL_HOURS = 
"check.partition.interval.hours";
     public static final int DEFAULT_CHECK_PARTITION_INTERVAL_HOURS = 6;
 
@@ -113,4 +110,7 @@ public class ConfigConstants {
     public static final int MAX_INIT_COUNT = 2;
     public static final int RANDOM_BOUND = 10;
 
+    public static final String KEY_ENABLE_STAT_AUDIT_DAY = 
"enable.stat.audit.day";
+    public static final boolean DEFAULT_ENABLE_STAT_AUDIT_DAY = true;
+
 }
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
index b48368b921..04fee349b5 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
@@ -170,8 +170,12 @@ public class SqlConstants {
     public static final String KEY_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL = 
"audit.data.temp.delete.partition.sql";
     public static final String DEFAULT_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL =
             "ALTER TABLE audit_data_temp DROP PARTITION %s";
-
-    public static final String KEY_AUDIT_DATA_TEMP_CHECK_PARTITION_SQL = 
"audit.data.temp.check.partition.sql";
-    public static final String DEFAULT_AUDIT_DATA_TEMP_CHECK_PARTITION_SQL =
-            "SELECT COUNT(*) AS count FROM INFORMATION_SCHEMA.PARTITIONS WHERE 
TABLE_NAME = 'audit_data_temp' and PARTITION_NAME = ?";
+    public static final String KEY_TABLE_AUDIT_DATA_CHECK_PARTITION_SQL = 
"audit.data.check.partition.sql";
+    public static final String DEFAULT_TABLE_AUDIT_DATA_CHECK_PARTITION_SQL =
+            "SELECT COUNT(*) AS count FROM INFORMATION_SCHEMA.PARTITIONS WHERE 
TABLE_NAME = '%s' and PARTITION_NAME = '%s'";
+    public static final String KEY_TABLE_AUDIT_DATA_DAY_ADD_PARTITION_SQL = 
"audit.data.day.add.partition.sql";
+    public static final String DEFAULT_TABLE_AUDIT_DATA_DAY_ADD_PARTITION_SQL =
+            "ALTER TABLE audit_data_day ADD PARTITION (PARTITION %s VALUES 
LESS THAN (TO_DAYS('%s')))";
+    public static final String TABLE_AUDIT_DATA_DAY = "audit_data_day";
+    public static final String TABLE_AUDIT_DATA_TEMP = "audit_data_temp";
 }
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/PartitionEntity.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/PartitionEntity.java
new file mode 100644
index 0000000000..4704ea1031
--- /dev/null
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/PartitionEntity.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.entities;
+
+import org.apache.inlong.audit.config.Configuration;
+
+import lombok.Data;
+
+import java.time.LocalDate;
+import java.time.format.DateTimeFormatter;
+
+import static 
org.apache.inlong.audit.config.SqlConstants.DEFAULT_TABLE_AUDIT_DATA_CHECK_PARTITION_SQL;
+import static 
org.apache.inlong.audit.config.SqlConstants.KEY_TABLE_AUDIT_DATA_CHECK_PARTITION_SQL;
+
+@Data
+public class PartitionEntity {
+
+    private final String tableName;
+    private final String addPartitionStatement;
+    private final String deletePartitionStatement;
+    private final DateTimeFormatter FORMATTER_YYMMDDHH = 
DateTimeFormatter.ofPattern("yyyyMMdd");
+    private final DateTimeFormatter FORMATTER_YY_MM_DD_HH = 
DateTimeFormatter.ofPattern("yyyy-MM-dd");
+
+    private String formatPartitionName(LocalDate date) {
+        return "p" + date.format(FORMATTER_YYMMDDHH);
+    }
+
+    public PartitionEntity(String tableName, String addPartitionStatement, 
String deletePartitionStatement) {
+        this.tableName = tableName;
+        this.addPartitionStatement = addPartitionStatement;
+        this.deletePartitionStatement = deletePartitionStatement;
+    }
+
+    public String getAddPartitionSql(long daysToAdd) {
+        String partitionValue = LocalDate.now().plusDays(daysToAdd + 
1).format(FORMATTER_YY_MM_DD_HH);
+        return String.format(addPartitionStatement, 
getAddPartitionName(daysToAdd), partitionValue);
+    }
+
+    public String getDeletePartitionSql(long daysToDelete) {
+        return String.format(deletePartitionStatement, 
getDeletePartitionName(daysToDelete));
+    }
+
+    public String getCheckPartitionSql(long partitionDay, boolean isDelete) {
+        String partitionName = isDelete ? getDeletePartitionName(partitionDay) 
: getAddPartitionName(partitionDay);
+        return 
String.format(Configuration.getInstance().get(KEY_TABLE_AUDIT_DATA_CHECK_PARTITION_SQL,
+                DEFAULT_TABLE_AUDIT_DATA_CHECK_PARTITION_SQL), tableName, 
partitionName);
+    }
+
+    public String getAddPartitionName(long daysToAdd) {
+        return formatPartitionName(LocalDate.now().plusDays(daysToAdd));
+    }
+
+    public String getDeletePartitionName(long daysToDelete) {
+        return formatPartitionName(LocalDate.now().minusDays(daysToDelete));
+    }
+}
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/SourceConfig.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/SourceConfig.java
index 88730a203a..b43b72c052 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/SourceConfig.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/SourceConfig.java
@@ -32,7 +32,7 @@ public class SourceConfig {
     private int statBackTimes;
     private final String driverClassName;
     private final String jdbcUrl;
-    private final String username;
+    private final String userName;
     private final String password;
     private boolean needJoin = false;
 
@@ -48,7 +48,7 @@ public class SourceConfig {
         this.statBackTimes = statBackTimes;
         this.driverClassName = driverClassName;
         this.jdbcUrl = jdbcUrl;
-        this.username = username;
+        this.userName = username;
         this.password = password;
     }
 }
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/main/Application.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/main/Application.java
index 067667133d..7df068473a 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/main/Application.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/main/Application.java
@@ -25,6 +25,7 @@ import org.apache.inlong.audit.selector.api.SelectorFactory;
 import org.apache.inlong.audit.service.ApiService;
 import org.apache.inlong.audit.service.ConfigService;
 import org.apache.inlong.audit.service.EtlService;
+import org.apache.inlong.audit.service.PartitionManager;
 import org.apache.inlong.audit.utils.JdbcUtils;
 import org.apache.inlong.common.util.NetworkUtils;
 
@@ -51,6 +52,8 @@ public class Application {
             // Periodically obtain audit id and audit course configuration 
from DB
             ConfigService.getInstance().start();
 
+            PartitionManager.getInstance().start();
+
             // Etl service aggregate the data from the data source and store 
the aggregated data to the target storage
             etlService.start();
 
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/EtlService.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/EtlService.java
index 95e1cddd75..a5eb286a08 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/EtlService.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/EtlService.java
@@ -26,11 +26,14 @@ import org.apache.inlong.audit.entities.AuditCycle;
 import org.apache.inlong.audit.entities.JdbcConfig;
 import org.apache.inlong.audit.entities.SinkConfig;
 import org.apache.inlong.audit.entities.SourceConfig;
+import org.apache.inlong.audit.entities.StatData;
+import org.apache.inlong.audit.sink.AuditSink;
 import org.apache.inlong.audit.sink.CacheSink;
 import org.apache.inlong.audit.sink.JdbcSink;
 import org.apache.inlong.audit.source.JdbcSource;
 import org.apache.inlong.audit.utils.JdbcUtils;
 
+import com.github.benmanes.caffeine.cache.Cache;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,10 +41,12 @@ import java.util.LinkedList;
 import java.util.List;
 
 import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATA_QUEUE_SIZE;
+import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_ENABLE_STAT_AUDIT_DAY;
 import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SELECTOR_SERVICE_ID;
 import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SUMMARY_DAILY_STAT_BACK_TIMES;
 import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SUMMARY_REALTIME_STAT_BACK_TIMES;
 import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DATA_QUEUE_SIZE;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_ENABLE_STAT_AUDIT_DAY;
 import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_SELECTOR_SERVICE_ID;
 import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_SUMMARY_DAILY_STAT_BACK_TIMES;
 import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_SUMMARY_REALTIME_STAT_BACK_TIMES;
@@ -60,113 +65,70 @@ import static 
org.apache.inlong.audit.config.SqlConstants.KEY_SOURCE_STAT_SQL;
 public class EtlService {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(EtlService.class);
-    private JdbcSource mysqlSourceOfTemp;
-    private JdbcSource mysqlSourceOfTenMinutesCache;
-    private JdbcSource mysqlSourceOfHalfHourCache;
-    private JdbcSource mysqlSourceOfHourCache;
-    private JdbcSink mysqlSinkOfDay;
-    private final List<JdbcSource> auditJdbcSources = new LinkedList<>();
-    private JdbcSink mysqlSinkOfTemp;
-    private CacheSink cacheSinkOfTenMinutesCache;
-    private CacheSink cacheSinkOfHalfHourCache;
-    private CacheSink cacheSinkOfHourCache;
+
+    // Statistics of original audit data
+    private final List<JdbcSource> originalSources = new LinkedList<>();
     private final int queueSize;
-    private final int statBackTimes;
     private final String serviceId;
+    private final Configuration configuration;
+
+    private final List<JdbcSource> dataFlowSources = new LinkedList<>();
+    private final List<AuditSink> dataFlowSinks = new LinkedList<>();
 
     public EtlService() {
-        queueSize = Configuration.getInstance().get(KEY_DATA_QUEUE_SIZE,
+        configuration = Configuration.getInstance();
+        queueSize = configuration.get(KEY_DATA_QUEUE_SIZE,
                 DEFAULT_DATA_QUEUE_SIZE);
-        statBackTimes = 
Configuration.getInstance().get(KEY_SUMMARY_REALTIME_STAT_BACK_TIMES,
-                DEFAULT_SUMMARY_REALTIME_STAT_BACK_TIMES);
-        serviceId = Configuration.getInstance().get(KEY_SELECTOR_SERVICE_ID, 
DEFAULT_SELECTOR_SERVICE_ID);
+        serviceId = configuration.get(KEY_SELECTOR_SERVICE_ID, 
DEFAULT_SELECTOR_SERVICE_ID);
     }
 
-    /**
-     * Start the etl service.
-     */
     public void start() {
-        mysqlToMysqlOfDay();
-        mysqlToTenMinutesCache();
-        mysqlToHalfHourCache();
-        mysqlToHourCache();
-    }
-
-    /**
-     * Aggregate data from mysql data source and store the aggregated data in 
the target mysql table.
-     * The audit data cycle is days,and stored in table of day.
-     */
-    private void mysqlToMysqlOfDay() {
-        DataQueue dataQueue = new DataQueue(queueSize);
-
-        mysqlSourceOfTemp = new JdbcSource(dataQueue, 
buildMysqlSourceConfig(AuditCycle.DAY,
-                
Configuration.getInstance().get(KEY_SUMMARY_DAILY_STAT_BACK_TIMES,
-                        DEFAULT_SUMMARY_DAILY_STAT_BACK_TIMES)));
-        mysqlSourceOfTemp.start();
-
-        SinkConfig sinkConfig = 
buildMysqlSinkConfig(Configuration.getInstance().get(KEY_MYSQL_SINK_INSERT_DAY_SQL,
-                DEFAULT_MYSQL_SINK_INSERT_DAY_SQL));
-        mysqlSinkOfDay = new JdbcSink(dataQueue, sinkConfig);
-        mysqlSinkOfDay.start();
-    }
-
-    /**
-     * Aggregate data from mysql data source and store in local cache for 
openapi.
-     */
-    private void mysqlToTenMinutesCache() {
-        DataQueue dataQueue = new DataQueue(queueSize);
-        mysqlSourceOfTenMinutesCache =
-                new JdbcSource(dataQueue, 
buildMysqlSourceConfig(AuditCycle.MINUTE_10, statBackTimes));
-        mysqlSourceOfTenMinutesCache.start();
-
-        cacheSinkOfTenMinutesCache = new CacheSink(dataQueue, 
TenMinutesCache.getInstance().getCache());
-        cacheSinkOfTenMinutesCache.start();
-    }
+        int statBackTimes = 
configuration.get(KEY_SUMMARY_REALTIME_STAT_BACK_TIMES,
+                DEFAULT_SUMMARY_REALTIME_STAT_BACK_TIMES);
 
-    /**
-     * Aggregate data from mysql data source and store in local cache for 
openapi.
-     */
-    private void mysqlToHalfHourCache() {
-        DataQueue dataQueue = new DataQueue(queueSize);
-        mysqlSourceOfHalfHourCache =
-                new JdbcSource(dataQueue, 
buildMysqlSourceConfig(AuditCycle.MINUTE_30, statBackTimes));
-        mysqlSourceOfHalfHourCache.start();
+        startDataFlow(AuditCycle.MINUTE_10, statBackTimes, 
TenMinutesCache.getInstance().getCache());
+        startDataFlow(AuditCycle.MINUTE_30, statBackTimes, 
HalfHourCache.getInstance().getCache());
+        startDataFlow(AuditCycle.HOUR, statBackTimes, 
HourCache.getInstance().getCache());
 
-        cacheSinkOfHalfHourCache = new CacheSink(dataQueue, 
HalfHourCache.getInstance().getCache());
-        cacheSinkOfHalfHourCache.start();
+        if (configuration.get(KEY_ENABLE_STAT_AUDIT_DAY, 
DEFAULT_ENABLE_STAT_AUDIT_DAY)) {
+            statBackTimes = 
configuration.get(KEY_SUMMARY_DAILY_STAT_BACK_TIMES, 
DEFAULT_SUMMARY_DAILY_STAT_BACK_TIMES);
+            startDataFlow(AuditCycle.DAY, statBackTimes, null);
+        }
     }
 
-    /**
-     * Aggregate data from mysql data source and store in local cache for 
openapi.
-     */
-    private void mysqlToHourCache() {
+    private void startDataFlow(AuditCycle cycle, int backTimes, Cache<String, 
StatData> cache) {
         DataQueue dataQueue = new DataQueue(queueSize);
-        mysqlSourceOfHourCache = new JdbcSource(dataQueue, 
buildMysqlSourceConfig(AuditCycle.HOUR, statBackTimes));
-        mysqlSourceOfHourCache.start();
-
-        cacheSinkOfHourCache = new CacheSink(dataQueue, 
HourCache.getInstance().getCache());
-        cacheSinkOfHourCache.start();
+        JdbcSource source = new JdbcSource(dataQueue, 
buildMysqlSourceConfig(cycle, backTimes));
+        source.start();
+        dataFlowSources.add(source);
+
+        AuditSink sink;
+        if (cache != null) {
+            sink = new CacheSink(dataQueue, cache);
+        } else {
+            SinkConfig sinkConfig = 
buildMysqlSinkConfig(configuration.get(KEY_MYSQL_SINK_INSERT_DAY_SQL,
+                    DEFAULT_MYSQL_SINK_INSERT_DAY_SQL));
+            sink = new JdbcSink(dataQueue, sinkConfig);
+        }
+        sink.start();
+        dataFlowSinks.add(sink);
     }
 
-    /**
-     * Aggregate data from clickhouse data source and store the aggregated 
data in the target mysql table.
-     * The default audit data cycle is 5 minutes,and stored in a temporary 
table.
-     * Support multiple audit source clusters.
-     */
     public void auditSourceToMysql() {
         DataQueue dataQueue = new DataQueue(queueSize);
         List<JdbcConfig> sourceList = 
ConfigService.getInstance().getAuditSourceByServiceId(serviceId);
         for (JdbcConfig jdbcConfig : sourceList) {
             JdbcSource jdbcSource = new JdbcSource(dataQueue, 
buildAuditJdbcSourceConfig(jdbcConfig));
             jdbcSource.start();
-            auditJdbcSources.add(jdbcSource);
+            originalSources.add(jdbcSource);
             LOGGER.info("Audit source to mysql jdbc config:{}", jdbcConfig);
         }
 
-        SinkConfig sinkConfig = 
buildMysqlSinkConfig(Configuration.getInstance().get(KEY_MYSQL_SINK_INSERT_TEMP_SQL,
+        SinkConfig sinkConfig = 
buildMysqlSinkConfig(configuration.get(KEY_MYSQL_SINK_INSERT_TEMP_SQL,
                 DEFAULT_MYSQL_SINK_INSERT_TEMP_SQL));
-        mysqlSinkOfTemp = new JdbcSink(dataQueue, sinkConfig);
-        mysqlSinkOfTemp.start();
+        JdbcSink sink = new JdbcSink(dataQueue, sinkConfig);
+        sink.start();
+        dataFlowSinks.add(sink);
     }
 
     /**
@@ -193,7 +155,7 @@ public class EtlService {
     private SourceConfig buildMysqlSourceConfig(AuditCycle auditCycle, int 
statBackTimes) {
         JdbcConfig jdbcConfig = JdbcUtils.buildMysqlConfig();
         return new SourceConfig(auditCycle,
-                
Configuration.getInstance().get(KEY_MYSQL_SOURCE_QUERY_TEMP_SQL,
+                configuration.get(KEY_MYSQL_SOURCE_QUERY_TEMP_SQL,
                         DEFAULT_MYSQL_SOURCE_QUERY_TEMP_SQL),
                 statBackTimes,
                 jdbcConfig.getDriverClass(),
@@ -209,9 +171,9 @@ public class EtlService {
      */
     private SourceConfig buildAuditJdbcSourceConfig(JdbcConfig jdbcConfig) {
         return new SourceConfig(AuditCycle.MINUTE_5,
-                Configuration.getInstance().get(KEY_SOURCE_STAT_SQL,
+                configuration.get(KEY_SOURCE_STAT_SQL,
                         DEFAULT_SOURCE_STAT_SQL),
-                
Configuration.getInstance().get(KEY_SUMMARY_REALTIME_STAT_BACK_TIMES,
+                configuration.get(KEY_SUMMARY_REALTIME_STAT_BACK_TIMES,
                         DEFAULT_SUMMARY_REALTIME_STAT_BACK_TIMES),
                 jdbcConfig.getDriverClass(),
                 jdbcConfig.getJdbcUrl(),
@@ -224,21 +186,14 @@ public class EtlService {
      * Stop the etl service,and destroy related resources.
      */
     public void stop() {
-        mysqlSourceOfTemp.destroy();
-        mysqlSinkOfDay.destroy();
-
-        for (JdbcSource source : auditJdbcSources) {
+        for (JdbcSource source : originalSources) {
             source.destroy();
         }
-        if (null != mysqlSinkOfTemp)
-            mysqlSinkOfTemp.destroy();
-
-        mysqlSourceOfTenMinutesCache.destroy();
-        mysqlSourceOfHalfHourCache.destroy();
-        mysqlSourceOfHourCache.destroy();
-
-        cacheSinkOfTenMinutesCache.destroy();
-        cacheSinkOfHalfHourCache.destroy();
-        cacheSinkOfHourCache.destroy();
+        for (JdbcSource source : dataFlowSources) {
+            source.destroy();
+        }
+        for (AuditSink sink : dataFlowSinks) {
+            sink.destroy();
+        }
     }
 }
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/PartitionManager.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/PartitionManager.java
new file mode 100644
index 0000000000..5238bc1d05
--- /dev/null
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/PartitionManager.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.service;
+
+import org.apache.inlong.audit.config.Configuration;
+import org.apache.inlong.audit.entities.JdbcConfig;
+import org.apache.inlong.audit.entities.PartitionEntity;
+import org.apache.inlong.audit.utils.JdbcUtils;
+
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_AUDIT_DATA_TEMP_STORAGE_DAYS;
+import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CHECK_PARTITION_INTERVAL_HOURS;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_AUDIT_DATA_TEMP_STORAGE_DAYS;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_CHECK_PARTITION_INTERVAL_HOURS;
+import static 
org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_DATA_TEMP_ADD_PARTITION_SQL;
+import static 
org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL;
+import static 
org.apache.inlong.audit.config.SqlConstants.DEFAULT_TABLE_AUDIT_DATA_DAY_ADD_PARTITION_SQL;
+import static 
org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_DATA_TEMP_ADD_PARTITION_SQL;
+import static 
org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL;
+import static 
org.apache.inlong.audit.config.SqlConstants.KEY_TABLE_AUDIT_DATA_DAY_ADD_PARTITION_SQL;
+import static org.apache.inlong.audit.config.SqlConstants.TABLE_AUDIT_DATA_DAY;
+import static 
org.apache.inlong.audit.config.SqlConstants.TABLE_AUDIT_DATA_TEMP;
+
+public class PartitionManager {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(PartitionManager.class);
+    private static volatile PartitionManager partitionManager = null;
+    private final ScheduledExecutorService timer = 
Executors.newSingleThreadScheduledExecutor();
+    private DataSource dataSource;
+    private final PartitionEntity auditDayTable;
+    private final PartitionEntity auditTempTable;
+    private final Configuration configuration;
+
+    public static PartitionManager getInstance() {
+        if (partitionManager == null) {
+            synchronized (PartitionManager.class) {
+                if (partitionManager == null) {
+                    partitionManager = new PartitionManager();
+                }
+            }
+        }
+        return partitionManager;
+    }
+
+    private PartitionManager() {
+        configuration = Configuration.getInstance();
+        createDataSource();
+        auditDayTable = createAndAddPartition(TABLE_AUDIT_DATA_DAY,
+                KEY_TABLE_AUDIT_DATA_DAY_ADD_PARTITION_SQL,
+                DEFAULT_TABLE_AUDIT_DATA_DAY_ADD_PARTITION_SQL,
+                null,
+                null);
+        auditTempTable = createAndAddPartition(TABLE_AUDIT_DATA_TEMP,
+                KEY_AUDIT_DATA_TEMP_ADD_PARTITION_SQL,
+                DEFAULT_AUDIT_DATA_TEMP_ADD_PARTITION_SQL,
+                KEY_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL,
+                DEFAULT_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL);
+    }
+
+    private PartitionEntity createAndAddPartition(String tableName,
+            String addPartitionKey,
+            String defaultAddPartitionSql,
+            String deletePartitionKey,
+            String defaultDeletePartitionSql) {
+        String addPartitionSql = configuration.get(addPartitionKey, 
defaultAddPartitionSql);
+        String deletePartitionSql =
+                deletePartitionKey != null ? 
configuration.get(deletePartitionKey, defaultDeletePartitionSql) : null;
+        PartitionEntity partitionEntity = new PartitionEntity(tableName, 
addPartitionSql, deletePartitionSql);
+        addPartition(partitionEntity, 0);
+        return partitionEntity;
+    }
+
+    public void start() {
+        long intervalHours =
+                configuration.get(KEY_CHECK_PARTITION_INTERVAL_HOURS, 
DEFAULT_CHECK_PARTITION_INTERVAL_HOURS);
+        timer.scheduleWithFixedDelay(this::executePartitionManagement, 0, 
intervalHours, TimeUnit.HOURS);
+    }
+
+    private void executePartitionManagement() {
+        try {
+            managePartition(auditDayTable, false);
+            managePartition(auditTempTable, true);
+        } catch (Exception e) {
+            LOGGER.error("Error occurred while managing partitions", e);
+        }
+    }
+
+    private void managePartition(PartitionEntity partitionEntity, boolean 
delete) {
+        addPartition(partitionEntity, 1);
+        if (delete) {
+            long storageDays =
+                    configuration.get(KEY_AUDIT_DATA_TEMP_STORAGE_DAYS, 
DEFAULT_AUDIT_DATA_TEMP_STORAGE_DAYS);
+            deletePartition(partitionEntity, storageDays);
+        }
+    }
+
+    private void addPartition(PartitionEntity partitionEntity, long daysToAdd) 
{
+        String partitionName = partitionEntity.getAddPartitionName(daysToAdd);
+        if (isPartitionExist(partitionEntity.getCheckPartitionSql(daysToAdd, 
false))) {
+            LOGGER.info("Partition [{}] of [{}] already exists. Don`t need to 
add.", partitionName,
+                    partitionEntity.getTableName());
+            return;
+        }
+        executeUpdate(partitionEntity.getAddPartitionSql(daysToAdd));
+    }
+
+    private void deletePartition(PartitionEntity partitionEntity, long 
daysToDelete) {
+        String partitionName = 
partitionEntity.getDeletePartitionName(daysToDelete);
+        if 
(!isPartitionExist(partitionEntity.getCheckPartitionSql(daysToDelete, true))) {
+            LOGGER.info("Partition [{}] of [{}] does not exist. Don`t need to 
delete.", partitionName,
+                    partitionEntity.getTableName());
+            return;
+        }
+        executeUpdate(partitionEntity.getDeletePartitionSql(daysToDelete));
+    }
+
+    private boolean isPartitionExist(String querySql) {
+        try (Connection connection = dataSource.getConnection();
+                PreparedStatement statement = 
connection.prepareStatement(querySql)) {
+            return isPartitionInResultSet(statement);
+        } catch (SQLException exception) {
+            LOGGER.error("An exception occurred while checking partition 
[{}]:", querySql, exception);
+        }
+        return false;
+    }
+
+    private boolean isPartitionInResultSet(PreparedStatement statement) {
+        try (ResultSet resultSet = statement.executeQuery()) {
+            if (resultSet.next()) {
+                return resultSet.getInt("count") > 0;
+            }
+        } catch (SQLException sqlException) {
+            LOGGER.error("An error occurred while processing the result set:", 
sqlException);
+        }
+        return false;
+    }
+
+    private void executeUpdate(String sql) {
+        try (Connection connection = dataSource.getConnection();
+                PreparedStatement statement = 
connection.prepareStatement(sql)) {
+            statement.executeUpdate();
+            LOGGER.info("Success to manage partition, execute SQL: {}", sql);
+        } catch (SQLException e) {
+            LOGGER.error("Failed to execute update: {}", sql, e);
+        }
+    }
+
+    private void createDataSource() {
+        JdbcConfig jdbcConfig = JdbcUtils.buildMysqlConfig();
+        HikariConfig hikariConfig = JdbcUtils.buildHikariConfig(
+                jdbcConfig.getDriverClass(),
+                jdbcConfig.getJdbcUrl(),
+                jdbcConfig.getUserName(),
+                jdbcConfig.getPassword());
+        dataSource = new HikariDataSource(hikariConfig);
+    }
+}
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/AuditSink.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/AuditSink.java
new file mode 100644
index 0000000000..be7250e2dc
--- /dev/null
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/AuditSink.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.sink;
+
+public interface AuditSink {
+
+    void start();
+    void destroy();
+}
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/CacheSink.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/CacheSink.java
index 6f5f809a3d..a695d5d9b5 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/CacheSink.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/CacheSink.java
@@ -38,7 +38,7 @@ import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_SOURCE_DB_SINK_
 /**
  * Cache sink
  */
-public class CacheSink {
+public class CacheSink implements AuditSink {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(CacheSink.class);
     private final ScheduledExecutorService sinkTimer = 
Executors.newSingleThreadScheduledExecutor();
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java
index ac9afc50d4..e802984da9 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java
@@ -21,6 +21,7 @@ import org.apache.inlong.audit.channel.DataQueue;
 import org.apache.inlong.audit.config.Configuration;
 import org.apache.inlong.audit.entities.SinkConfig;
 import org.apache.inlong.audit.entities.StatData;
+import org.apache.inlong.audit.utils.JdbcUtils;
 
 import com.zaxxer.hikari.HikariConfig;
 import com.zaxxer.hikari.HikariDataSource;
@@ -31,75 +32,41 @@ import javax.sql.DataSource;
 
 import java.sql.Connection;
 import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.time.LocalDate;
-import java.time.format.DateTimeFormatter;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.inlong.audit.config.ConfigConstants.CACHE_PREP_STMTS;
-import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_AUDIT_DATA_TEMP_STORAGE_DAYS;
-import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CACHE_PREP_STMTS;
-import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CHECK_PARTITION_INTERVAL_HOURS;
-import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CONNECTION_TIMEOUT;
-import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_POOL_SIZE;
-import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_ENABLE_MANAGE_PARTITIONS;
-import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SIZE;
-import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SQL_LIMIT;
 import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_QUEUE_PULL_TIMEOUT;
 import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SOURCE_DB_SINK_BATCH;
 import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SOURCE_DB_SINK_INTERVAL;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_AUDIT_DATA_TEMP_STORAGE_DAYS;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_CACHE_PREP_STMTS;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_CHECK_PARTITION_INTERVAL_HOURS;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_CONNECTION_TIMEOUT;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_POOL_SIZE;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_ENABLE_MANAGE_PARTITIONS;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SIZE;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SQL_LIMIT;
 import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_QUEUE_PULL_TIMEOUT;
 import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_SOURCE_DB_SINK_BATCH;
 import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_SOURCE_DB_SINK_INTERVAL;
-import static 
org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SIZE;
-import static 
org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SQL_LIMIT;
-import static 
org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_DATA_TEMP_ADD_PARTITION_SQL;
-import static 
org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_DATA_TEMP_CHECK_PARTITION_SQL;
-import static 
org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL;
-import static 
org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_DATA_TEMP_ADD_PARTITION_SQL;
-import static 
org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_DATA_TEMP_CHECK_PARTITION_SQL;
-import static 
org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL;
 
 /**
  * Jdbc sink
  */
-public class JdbcSink implements AutoCloseable {
+public class JdbcSink implements AutoCloseable, AuditSink {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(JdbcSink.class);
     private final ScheduledExecutorService sinkTimer = 
Executors.newSingleThreadScheduledExecutor();
-    private final ScheduledExecutorService partitionManagerTimer = 
Executors.newSingleThreadScheduledExecutor();
     private final DataQueue dataQueue;
     private final int insertBatch;
     private final int pullTimeOut;
     private final SinkConfig sinkConfig;
     private DataSource dataSource;
-
-    private final DateTimeFormatter FORMATTER_YYMMDDHH = 
DateTimeFormatter.ofPattern("yyyyMMdd");
-    private final DateTimeFormatter FORMATTER_YY_MM_DD_HH = 
DateTimeFormatter.ofPattern("yyyy-MM-dd");
-    private final String checkPartitionSql;
+    private final Configuration configuration;
 
     public JdbcSink(DataQueue dataQueue, SinkConfig sinkConfig) {
+        configuration = Configuration.getInstance();
         this.dataQueue = dataQueue;
         this.sinkConfig = sinkConfig;
 
-        insertBatch = Configuration.getInstance().get(KEY_SOURCE_DB_SINK_BATCH,
+        insertBatch = configuration.get(KEY_SOURCE_DB_SINK_BATCH,
                 DEFAULT_SOURCE_DB_SINK_BATCH);
 
-        pullTimeOut = Configuration.getInstance().get(KEY_QUEUE_PULL_TIMEOUT,
+        pullTimeOut = configuration.get(KEY_QUEUE_PULL_TIMEOUT,
                 DEFAULT_QUEUE_PULL_TIMEOUT);
-        checkPartitionSql = 
Configuration.getInstance().get(KEY_AUDIT_DATA_TEMP_CHECK_PARTITION_SQL,
-                DEFAULT_AUDIT_DATA_TEMP_CHECK_PARTITION_SQL);
 
     }
 
@@ -108,23 +75,11 @@ public class JdbcSink implements AutoCloseable {
      */
     public void start() {
         createDataSource();
-
         sinkTimer.scheduleWithFixedDelay(this::process,
                 0,
-                Configuration.getInstance().get(KEY_SOURCE_DB_SINK_INTERVAL,
+                configuration.get(KEY_SOURCE_DB_SINK_INTERVAL,
                         DEFAULT_SOURCE_DB_SINK_INTERVAL),
                 TimeUnit.MILLISECONDS);
-        if (Configuration.getInstance().get(KEY_ENABLE_MANAGE_PARTITIONS,
-                DEFAULT_ENABLE_MANAGE_PARTITIONS)) {
-            // Create MySQL data partition of today
-            addPartition(0);
-
-            
partitionManagerTimer.scheduleWithFixedDelay(this::managePartitions,
-                    0,
-                    
Configuration.getInstance().get(KEY_CHECK_PARTITION_INTERVAL_HOURS,
-                            DEFAULT_CHECK_PARTITION_INTERVAL_HOURS),
-                    TimeUnit.HOURS);
-        }
     }
 
     /**
@@ -169,91 +124,12 @@ public class JdbcSink implements AutoCloseable {
      * Create data source
      */
     protected void createDataSource() {
-        HikariConfig config = new HikariConfig();
-        config.setDriverClassName(sinkConfig.getDriverClassName());
-        config.setJdbcUrl(sinkConfig.getJdbcUrl());
-        config.setUsername(sinkConfig.getUserName());
-        config.setPassword(sinkConfig.getPassword());
-        
config.setConnectionTimeout(Configuration.getInstance().get(KEY_DATASOURCE_CONNECTION_TIMEOUT,
-                DEFAULT_CONNECTION_TIMEOUT));
-        config.addDataSourceProperty(CACHE_PREP_STMTS,
-                Configuration.getInstance().get(KEY_CACHE_PREP_STMTS, 
DEFAULT_CACHE_PREP_STMTS));
-        config.addDataSourceProperty(PREP_STMT_CACHE_SIZE,
-                Configuration.getInstance().get(KEY_PREP_STMT_CACHE_SIZE, 
DEFAULT_PREP_STMT_CACHE_SIZE));
-        config.addDataSourceProperty(PREP_STMT_CACHE_SQL_LIMIT,
-                Configuration.getInstance().get(KEY_PREP_STMT_CACHE_SQL_LIMIT, 
DEFAULT_PREP_STMT_CACHE_SQL_LIMIT));
-        config.setMaximumPoolSize(
-                Configuration.getInstance().get(KEY_DATASOURCE_POOL_SIZE,
-                        DEFAULT_DATASOURCE_POOL_SIZE));
-        dataSource = new HikariDataSource(config);
-    }
-
-    private void managePartitions() {
-        // Create MySQL data partition of tomorrow
-        addPartition(1);
-
-        deletePartition();
-    }
-
-    private String formatPartitionName(LocalDate date) {
-        return "p" + date.format(FORMATTER_YYMMDDHH);
-    }
-
-    private void addPartition(long daysToAdd) {
-        String partitionName = 
formatPartitionName(LocalDate.now().plusDays(daysToAdd));
-        if (isPartitionExists(partitionName)) {
-            LOGGER.info("Partition [{}] is exist, dont`t need add this 
partition.", partitionName);
-            return;
-        }
-        String partitionValue = LocalDate.now().plusDays(daysToAdd + 
1).format(FORMATTER_YY_MM_DD_HH);
-        String addPartitionSQL = String.format(
-                
Configuration.getInstance().get(KEY_AUDIT_DATA_TEMP_ADD_PARTITION_SQL,
-                        DEFAULT_AUDIT_DATA_TEMP_ADD_PARTITION_SQL),
-                partitionName, partitionValue);
-        executeUpdate(addPartitionSQL);
-    }
-
-    private void deletePartition() {
-        int daysToSubtract = 
Configuration.getInstance().get(KEY_AUDIT_DATA_TEMP_STORAGE_DAYS,
-                DEFAULT_AUDIT_DATA_TEMP_STORAGE_DAYS);
-        String partitionName = 
formatPartitionName(LocalDate.now().minusDays(daysToSubtract));
-        if (!isPartitionExists(partitionName)) {
-            LOGGER.info("Partition [{}] is not exist, dont`t need delete this 
partition.", partitionName);
-            return;
-        }
-        String deletePartitionSQL = String.format(
-                
Configuration.getInstance().get(KEY_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL,
-                        DEFAULT_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL),
-                partitionName);
-        executeUpdate(deletePartitionSQL);
-    }
-
-    private void executeUpdate(String updateSQL) {
-        try (Connection connection = dataSource.getConnection();
-                PreparedStatement preparedStatement = 
connection.prepareStatement(updateSQL)) {
-            preparedStatement.executeUpdate();
-            LOGGER.info("Execute update [{}] success!", updateSQL);
-        } catch (Exception exception) {
-            LOGGER.error("Execute update [{}] has exception!", updateSQL, 
exception);
-        }
-    }
-
-    private boolean isPartitionExists(String partitionName) {
-        try (Connection connection = dataSource.getConnection();
-                PreparedStatement statement = 
connection.prepareStatement(checkPartitionSql)) {
-            statement.setString(1, partitionName);
-
-            try (ResultSet resultSet = statement.executeQuery()) {
-                if (resultSet.next()) {
-                    return resultSet.getInt("count") > 0;
-                }
-            } catch (SQLException sqlException) {
-                LOGGER.error("An error occurred while checking partition [{}] 
existence:", partitionName, sqlException);
-            }
-        } catch (Exception exception) {
-            LOGGER.error("An exception occurred while checking partition 
[{}]existence:", partitionName, exception);
-        }
-        return false;
+        HikariConfig hikariConfig = JdbcUtils.buildHikariConfig(
+                sinkConfig.getDriverClassName(),
+                sinkConfig.getJdbcUrl(),
+                sinkConfig.getUserName(),
+                sinkConfig.getPassword());
+        dataSource = new HikariDataSource(hikariConfig);
     }
 
     public void destroy() {
@@ -261,7 +137,7 @@ public class JdbcSink implements AutoCloseable {
     }
 
     @Override
-    public void close() throws Exception {
+    public void close() {
 
     }
 }
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java
index 511efc1f97..da009ac62e 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java
@@ -24,6 +24,7 @@ import org.apache.inlong.audit.entities.StartEndTime;
 import org.apache.inlong.audit.entities.StatData;
 import org.apache.inlong.audit.service.ConfigService;
 import org.apache.inlong.audit.utils.CacheUtils;
+import org.apache.inlong.audit.utils.JdbcUtils;
 
 import com.zaxxer.hikari.HikariConfig;
 import com.zaxxer.hikari.HikariDataSource;
@@ -51,26 +52,13 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.inlong.audit.config.ConfigConstants.CACHE_PREP_STMTS;
 import static org.apache.inlong.audit.config.ConfigConstants.DATE_FORMAT;
-import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CACHE_PREP_STMTS;
-import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CONNECTION_TIMEOUT;
-import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_POOL_SIZE;
-import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SIZE;
-import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SQL_LIMIT;
 import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SOURCE_DB_STAT_INTERVAL;
 import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_STAT_BACK_INITIAL_OFFSET;
 import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_STAT_THREAD_POOL_SIZE;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_CACHE_PREP_STMTS;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_CONNECTION_TIMEOUT;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_POOL_SIZE;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SIZE;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SQL_LIMIT;
 import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_SOURCE_DB_STAT_INTERVAL;
 import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_STAT_BACK_INITIAL_OFFSET;
 import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_STAT_THREAD_POOL_SIZE;
-import static 
org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SIZE;
-import static 
org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SQL_LIMIT;
 import static org.apache.inlong.audit.consts.ConfigConstants.DEFAULT_AUDIT_TAG;
 import static org.apache.inlong.audit.entities.AuditCycle.DAY;
 import static org.apache.inlong.audit.entities.AuditCycle.HOUR;
@@ -181,23 +169,12 @@ public class JdbcSource {
      * Create data source
      */
     protected void createDataSource() {
-        HikariConfig config = new HikariConfig();
-        config.setDriverClassName(sourceConfig.getDriverClassName());
-        config.setJdbcUrl(sourceConfig.getJdbcUrl());
-        config.setUsername(sourceConfig.getUsername());
-        config.setPassword(sourceConfig.getPassword());
-        
config.setConnectionTimeout(Configuration.getInstance().get(KEY_DATASOURCE_CONNECTION_TIMEOUT,
-                DEFAULT_CONNECTION_TIMEOUT));
-        config.addDataSourceProperty(CACHE_PREP_STMTS,
-                Configuration.getInstance().get(KEY_CACHE_PREP_STMTS, 
DEFAULT_CACHE_PREP_STMTS));
-        config.addDataSourceProperty(PREP_STMT_CACHE_SIZE,
-                Configuration.getInstance().get(KEY_PREP_STMT_CACHE_SIZE, 
DEFAULT_PREP_STMT_CACHE_SIZE));
-        config.addDataSourceProperty(PREP_STMT_CACHE_SQL_LIMIT,
-                Configuration.getInstance().get(KEY_PREP_STMT_CACHE_SQL_LIMIT, 
DEFAULT_PREP_STMT_CACHE_SQL_LIMIT));
-        config.setMaximumPoolSize(
-                Configuration.getInstance().get(KEY_DATASOURCE_POOL_SIZE,
-                        DEFAULT_DATASOURCE_POOL_SIZE));
-        dataSource = new HikariDataSource(config);
+        HikariConfig hikariConfig = JdbcUtils.buildHikariConfig(
+                sourceConfig.getDriverClassName(),
+                sourceConfig.getJdbcUrl(),
+                sourceConfig.getUserName(),
+                sourceConfig.getPassword());
+        dataSource = new HikariDataSource(hikariConfig);
     }
 
     /**
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/utils/JdbcUtils.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/utils/JdbcUtils.java
index fa629a725c..07f40e2e6b 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/utils/JdbcUtils.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/utils/JdbcUtils.java
@@ -20,13 +20,28 @@ package org.apache.inlong.audit.utils;
 import org.apache.inlong.audit.config.Configuration;
 import org.apache.inlong.audit.entities.JdbcConfig;
 
+import com.zaxxer.hikari.HikariConfig;
+
 import java.util.Objects;
 
+import static org.apache.inlong.audit.config.ConfigConstants.CACHE_PREP_STMTS;
+import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CACHE_PREP_STMTS;
+import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CONNECTION_TIMEOUT;
+import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_POOL_SIZE;
+import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SIZE;
+import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SQL_LIMIT;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_CACHE_PREP_STMTS;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_CONNECTION_TIMEOUT;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_POOL_SIZE;
 import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DEFAULT_MYSQL_DRIVER;
 import static org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_DRIVER;
 import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_JDBC_URL;
 import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_PASSWORD;
 import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_USERNAME;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SIZE;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SQL_LIMIT;
+import static 
org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SIZE;
+import static 
org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SQL_LIMIT;
 
 /**
  * Jdbc utils
@@ -64,4 +79,26 @@ public class JdbcUtils {
                 userName,
                 password);
     }
+
+    public static HikariConfig buildHikariConfig(String driverClassName, 
String jdbcUrl, String userName,
+            String passWord) {
+        HikariConfig hikariConfig = new HikariConfig();
+        hikariConfig.setDriverClassName(driverClassName);
+        hikariConfig.setJdbcUrl(jdbcUrl);
+        hikariConfig.setUsername(userName);
+        hikariConfig.setPassword(passWord);
+        Configuration configuration = Configuration.getInstance();
+        
hikariConfig.setConnectionTimeout(configuration.get(KEY_DATASOURCE_CONNECTION_TIMEOUT,
+                DEFAULT_CONNECTION_TIMEOUT));
+        hikariConfig.addDataSourceProperty(CACHE_PREP_STMTS,
+                configuration.get(KEY_CACHE_PREP_STMTS, 
DEFAULT_CACHE_PREP_STMTS));
+        hikariConfig.addDataSourceProperty(PREP_STMT_CACHE_SIZE,
+                configuration.get(KEY_PREP_STMT_CACHE_SIZE, 
DEFAULT_PREP_STMT_CACHE_SIZE));
+        hikariConfig.addDataSourceProperty(PREP_STMT_CACHE_SQL_LIMIT,
+                configuration.get(KEY_PREP_STMT_CACHE_SQL_LIMIT, 
DEFAULT_PREP_STMT_CACHE_SQL_LIMIT));
+        hikariConfig.setMaximumPoolSize(
+                configuration.get(KEY_DATASOURCE_POOL_SIZE,
+                        DEFAULT_DATASOURCE_POOL_SIZE));
+        return hikariConfig;
+    }
 }
diff --git a/inlong-audit/sql/apache_inlong_audit_mysql.sql 
b/inlong-audit/sql/apache_inlong_audit_mysql.sql
index e9e114e2e0..6fb07021f5 100644
--- a/inlong-audit/sql/apache_inlong_audit_mysql.sql
+++ b/inlong-audit/sql/apache_inlong_audit_mysql.sql
@@ -92,7 +92,9 @@ CREATE TABLE IF NOT EXISTS `audit_data_day`
     `update_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON 
UPDATE CURRENT_TIMESTAMP COMMENT 'Update time',
     PRIMARY KEY 
(`log_ts`,`inlong_group_id`,`inlong_stream_id`,`audit_id`,`audit_tag`)
 ) ENGINE = InnoDB
-DEFAULT CHARSET = utf8 COMMENT ='Inlong audit data day table';
+DEFAULT CHARSET = utf8 COMMENT ='Inlong audit data day table'
+PARTITION BY RANGE (to_days(`log_ts`))
+(PARTITION pDefault VALUES LESS THAN (TO_DAYS('1970-01-01')));
 
 -- ----------------------------
 -- Table structure for selector

Reply via email to