This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d963e5c753 [INLONG-10099][Audit] Audit-store adds the general JDBC 
sink capability (#10104)
d963e5c753 is described below

commit d963e5c7537a8685df6812bbcf1b2d3b1b0a41a4
Author: doleyzi <43397300+dole...@users.noreply.github.com>
AuthorDate: Mon Apr 29 18:00:19 2024 +0800

    [INLONG-10099][Audit] Audit-store adds the general JDBC sink capability 
(#10104)
    
    * Audit-store adds the general JDBC sink capability
    
    * Audit-store adds the general JDBC sink capability
    
    * Modify configuration file of JDBC
    
    * Modify configuration file of JDBC
    
    * Modify configuration file of JDBC
    
    * Adjust the import order
---
 inlong-audit/audit-docker/Dockerfile               |   7 +-
 inlong-audit/audit-docker/audit-docker.sh          |  26 ++-
 .../config/{StoreConfig.java => JdbcConfig.java}   |  47 +++--
 .../apache/inlong/audit/config/StoreConfig.java    |   3 +
 .../entities/JdbcDataPo.java}                      |  55 ++---
 .../audit/service/AuditMsgConsumerServer.java      |  18 +-
 .../inlong/audit/service/ClickHouseService.java    |   7 +
 .../inlong/audit/service/ElasticsearchService.java |   7 +
 .../apache/inlong/audit/service/InsertData.java    |   5 +
 .../apache/inlong/audit/service/JdbcService.java   | 224 +++++++++++++++++++++
 .../apache/inlong/audit/service/MySqlService.java  |   8 +
 .../inlong/audit/service/consume/BaseConsume.java  |  13 +-
 .../audit/service/consume/PulsarConsume.java       |   3 +-
 inlong-audit/conf/application.properties           |   6 +
 inlong-audit/conf/audit-service.properties         |  16 --
 15 files changed, 366 insertions(+), 79 deletions(-)

diff --git a/inlong-audit/audit-docker/Dockerfile 
b/inlong-audit/audit-docker/Dockerfile
index d55ac7e2f6..6717eca684 100644
--- a/inlong-audit/audit-docker/Dockerfile
+++ b/inlong-audit/audit-docker/Dockerfile
@@ -35,7 +35,7 @@ ENV TUBE_AUDIT_TOPIC="inlong-audit"
 ENV AUDIT_DBNAME="apache_inlong_audit"
 # proxy/store/all, start audit module individually, or all
 ENV START_MODE="all"
-# mysql / clickhouse / elasticsearch
+# mysql / clickhouse / elasticsearch / starrocks
 ENV STORE_MODE=mysql
 # mysql
 ENV JDBC_URL=127.0.0.1:3306
@@ -52,6 +52,11 @@ ENV STORE_ES_PORT=9200
 ENV STORE_ES_AUTHENABLE=false
 ENV STORE_ES_USERNAME=elastic
 ENV STORE_ES_PASSWD=inlong
+# starrocks
+ENV STORE_SR_URL=127.0.0.1:9030
+ENV STORE_SR_USERNAME=default
+ENV STORE_SR_PASSWD=default
+ENV STORE_SR_DBNAME="apache_inlong_audit"
 # jvm
 ENV AUDIT_JVM_HEAP_OPTS="-XX:+UseContainerSupport 
-XX:InitialRAMPercentage=40.0 -XX:MaxRAMPercentage=80.0 
-XX:-UseAdaptiveSizePolicy"
 WORKDIR /opt/inlong-audit
diff --git a/inlong-audit/audit-docker/audit-docker.sh 
b/inlong-audit/audit-docker/audit-docker.sh
index 42c43c9322..777a80b801 100755
--- a/inlong-audit/audit-docker/audit-docker.sh
+++ b/inlong-audit/audit-docker/audit-docker.sh
@@ -21,8 +21,12 @@ file_path=$(cd "$(dirname "$0")"/../;pwd)
 store_conf_file=${file_path}/conf/application.properties
 # proxy config
 proxy_conf_file=${file_path}/conf/audit-proxy-${MQ_TYPE}.conf
-sql_file="${file_path}"/sql/apache_inlong_audit.sql
+sql_mysql_file="${file_path}"/sql/apache_inlong_audit_mysql.sql
 sql_ck_file="${file_path}"/sql/apache_inlong_audit_clickhouse.sql
+sql_sr_file="${file_path}"/sql/apache_inlong_audit_starrocks.sql
+
+# audit-service config
+service_conf_file=${file_path}/conf/audit-service.properties
 
 # replace the configuration for audit proxy
 sed -i 
"s/manager.hosts=.*$/manager.hosts=${MANAGER_OPENAPI_IP}:${MANAGER_OPENAPI_PORT}/g"
 "${store_conf_file}"
@@ -52,7 +56,7 @@ sed -i 
"s/127.0.0.1:3306\/apache_inlong_audit/${JDBC_URL}\/${AUDIT_DBNAME}/g" "$
 sed -i 
"s/spring.datasource.druid.username=.*$/spring.datasource.druid.username=${USERNAME}/g"
 "${store_conf_file}"
 sed -i 
"s/spring.datasource.druid.password=.*$/spring.datasource.druid.password=${PASSWORD}/g"
 "${store_conf_file}"
 # mysql file for audit
-sed -i "s/apache_inlong_audit/${AUDIT_DBNAME}/g" "${sql_file}"
+sed -i "s/apache_inlong_audit/${AUDIT_DBNAME}/g" "${sql_mysql_file}"
 # clickhouse
 sed -i 
"s/clickhouse.url=.*$/clickhouse.url=jdbc:clickhouse:\/\/${STORE_CK_URL}\/${STORE_CK_DBNAME}/g"
 "${store_conf_file}"
 sed -i "s/clickhouse.username=.*$/clickhouse.username=${STORE_CK_USERNAME}/g" 
"${store_conf_file}"
@@ -66,6 +70,18 @@ sed -i 
"s/elasticsearch.authEnable=.*$/elasticsearch.authEnable=${STORE_ES_AUTHE
 sed -i 
"s/elasticsearch.username=.*$/elasticsearch.username=${STORE_ES_USERNAME}/g" 
"${store_conf_file}"
 sed -i 
"s/elasticsearch.password=.*$/elasticsearch.password=${STORE_ES_PASSWD}/g" 
"${store_conf_file}"
 
+# StarRocks SQL file for audit
+sed -i "s/apache_inlong_audit/${AUDIT_DBNAME}/g" "${sql_sr_file}"
+# StarRocks
+sed -i 
"s/jdbc.url=.*$/jdbc.url=jdbc:mysql:\/\/${STORE_SR_URL}\/${STORE_SR_DBNAME}/g" 
"${store_conf_file}"
+sed -i "s/jdbc.username=.*$/jdbc.username=${STORE_SR_USERNAME}/g" 
"${store_conf_file}"
+sed -i "s/jdbc.password=.*$/jdbc.password=${STORE_SR_PASSWD}/g" 
"${store_conf_file}"
+
+# audit-service config
+sed -i 
"s/mysql.jdbc.url=.*$/mysql.jdbc.url=jdbc:mysql:\/\/${JDBC_URL}\/${AUDIT_DBNAME}/g"
 "${service_conf_file}"
+sed -i "s/mysql.jdbc.username=.*$/mysql.jdbc.username=${USERNAME}/g" 
"${service_conf_file}"
+sed -i "s/mysql.jdbc.password=.*$/mysql.jdbc.password=${PASSWORD}/g" 
"${service_conf_file}"
+
 # Whether the database table exists. If it does not exist, initialize the 
database and skip if it exists.
 if [[ "${JDBC_URL}" =~ (.+):([0-9]+) ]]; then
   datasource_hostname=${BASH_REMATCH[1]}
@@ -96,6 +112,12 @@ fi
 if [ "${START_MODE}" = "all" ] || [ "${START_MODE}" = "store" ]; then
   bash +x ./bin/store-start.sh
 fi
+
+# start service
+if [ "${START_MODE}" = "all" ] || [ "${START_MODE}" = "service" ]; then
+  bash +x ./bin/service-start.sh
+fi
+
 sleep 3
 # keep alive
 tail -F ./logs/info.log
diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/JdbcConfig.java
similarity index 57%
copy from 
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
copy to 
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/JdbcConfig.java
index 41ab00da8b..e7f3f639cd 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/JdbcConfig.java
@@ -17,29 +17,28 @@
 
 package org.apache.inlong.audit.config;
 
-import lombok.Getter;
-import lombok.Setter;
+import lombok.Data;
 import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
-
-@Component
-@Getter
-@Setter
-public class StoreConfig {
-
-    @Value("${audit.config.store.mode:mysql}")
-    private String store;
-
-    public boolean isMysqlStore() {
-        return store.contains("mysql");
-    }
-
-    public boolean isElasticsearchStore() {
-        return store.contains("elasticsearch");
-    }
-
-    public boolean isClickHouseStore() {
-        return store.contains("clickhouse");
-    }
-
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+@Data
+public class JdbcConfig {
+
+    @Value("${jdbc.driver:com.mysql.cj.jdbc.Driver}")
+    private String driver;
+    @Value("${jdbc.url}")
+    private String url;
+    @Value("${jdbc.username}")
+    private String userName;
+    @Value("${jdbc.password}")
+    private String password;
+    @Value("${jdbc.batchIntervalMs:1000}")
+    private int batchIntervalMs;
+    @Value("${jdbc.batchThreshold:500}")
+    private int batchThreshold;
+    @Value("${jdbc.processIntervalMs:100}")
+    private int processIntervalMs;
+    @Value("${data.queue.size:1000000}")
+    private int dataQueueSize;
 }
diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
index 41ab00da8b..c4a2db3328 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
@@ -42,4 +42,7 @@ public class StoreConfig {
         return store.contains("clickhouse");
     }
 
+    public boolean isJdbc() {
+        return store.contains("jdbc");
+    }
 }
diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/JdbcDataPo.java
similarity index 52%
copy from 
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
copy to 
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/JdbcDataPo.java
index 41ab00da8b..ebc42f4a53 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/JdbcDataPo.java
@@ -15,31 +15,32 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit.config;
-
-import lombok.Getter;
-import lombok.Setter;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
-
-@Component
-@Getter
-@Setter
-public class StoreConfig {
-
-    @Value("${audit.config.store.mode:mysql}")
-    private String store;
-
-    public boolean isMysqlStore() {
-        return store.contains("mysql");
-    }
-
-    public boolean isElasticsearchStore() {
-        return store.contains("elasticsearch");
-    }
-
-    public boolean isClickHouseStore() {
-        return store.contains("clickhouse");
-    }
-
+package org.apache.inlong.audit.db.entities;
+
+import lombok.Data;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+
+import java.sql.Timestamp;
+
+@Data
+public class JdbcDataPo {
+
+    private String ip;
+    private String dockerId;
+    private String threadId;
+    private Timestamp sdkTs;
+    private Long packetId;
+    private Timestamp logTs;
+    private String inLongGroupId;
+    private String inLongStreamId;
+    private String auditId;
+    private String auditTag;
+    private long auditVersion;
+    private Long count;
+    private Long size;
+    private Long delay;
+    private Timestamp updateTime;
+    private Consumer<byte[]> consumer;
+    private MessageId messageId;
 }
diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
index 04796a3b4b..2e2f4d9469 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.audit.service;
 
 import org.apache.inlong.audit.config.ClickHouseConfig;
+import org.apache.inlong.audit.config.JdbcConfig;
 import org.apache.inlong.audit.config.MessageQueueConfig;
 import org.apache.inlong.audit.config.StoreConfig;
 import org.apache.inlong.audit.consts.ConfigConstants;
@@ -68,16 +69,14 @@ public class AuditMsgConsumerServer implements 
InitializingBean {
     private ClickHouseConfig chConfig;
     // ClickHouseService
     private ClickHouseService ckService;
-
+    @Autowired
+    private JdbcConfig jdbcConfig;
+    private JdbcService jdbcService;
     private static final String DEFAULT_CONFIG_PROPERTIES = 
"application.properties";
-
     // interval time of getting mq config
     private static final int INTERVAL_MS = 5000;
-
     private final CloseableHttpClient httpClient = 
HttpClientBuilder.create().build();
-
     private final Gson gson = new Gson();
-
     /**
      * Initializing bean
      */
@@ -105,13 +104,15 @@ public class AuditMsgConsumerServer implements 
InitializingBean {
         if (mqConsume == null) {
             LOG.error("Unknown MessageQueue {}", mqConfig.getMqType());
         }
-
         if (storeConfig.isElasticsearchStore()) {
             esService.startTimerRoutine();
         }
         if (storeConfig.isClickHouseStore()) {
             ckService.start();
         }
+        if (storeConfig.isJdbc()) {
+            jdbcService.start();
+        }
         mqConsume.start();
     }
 
@@ -133,6 +134,11 @@ public class AuditMsgConsumerServer implements 
InitializingBean {
             ckService = new ClickHouseService(chConfig);
             insertServiceList.add(ckService);
         }
+        if (storeConfig.isJdbc()) {
+            // create jdbc object
+            jdbcService = new JdbcService(jdbcConfig);
+            insertServiceList.add(jdbcService);
+        }
         return insertServiceList;
     }
 
diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java
index ad257ba372..47c63aa395 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java
@@ -21,6 +21,8 @@ import org.apache.inlong.audit.config.ClickHouseConfig;
 import org.apache.inlong.audit.db.entities.ClickHouseDataPo;
 import org.apache.inlong.audit.protocol.AuditData;
 
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -197,6 +199,11 @@ public class ClickHouseService implements InsertData, 
AutoCloseable {
         }
     }
 
+    @Override
+    public void insert(AuditData msgBody, Consumer<byte[]> consumer, MessageId 
messageId) {
+
+    }
+
     /**
      * close
      *
diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ElasticsearchService.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ElasticsearchService.java
index 02a23c3a8c..3de234146f 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ElasticsearchService.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ElasticsearchService.java
@@ -24,6 +24,8 @@ import org.apache.inlong.audit.protocol.AuditData;
 import com.google.gson.FieldNamingPolicy;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
@@ -278,4 +280,9 @@ public class ElasticsearchService implements InsertData, 
AutoCloseable {
         esPo.setPacketId(msgBody.getPacketId());
         this.insertData(esPo);
     }
+
+    @Override
+    public void insert(AuditData msgBody, Consumer<byte[]> consumer, MessageId 
messageId) {
+
+    }
 }
diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/InsertData.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/InsertData.java
index 478920e3a2..7f9bcf8207 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/InsertData.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/InsertData.java
@@ -19,6 +19,9 @@ package org.apache.inlong.audit.service;
 
 import org.apache.inlong.audit.protocol.AuditData;
 
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+
 /**
  * Insert Data interface
  */
@@ -28,4 +31,6 @@ public interface InsertData {
      * insert audit data to storage.
      */
     void insert(AuditData msgBody);
+
+    void insert(AuditData msgBody, Consumer<byte[]> consumer, MessageId 
messageId);
 }
diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/JdbcService.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/JdbcService.java
new file mode 100644
index 0000000000..7aaac09e64
--- /dev/null
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/JdbcService.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.service;
+
+import org.apache.inlong.audit.config.JdbcConfig;
+import org.apache.inlong.audit.db.entities.JdbcDataPo;
+import org.apache.inlong.audit.protocol.AuditData;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This is a general jdbc sink service. As long as it meets the jdbc protocol, 
you can use this service.
+ */
+public class JdbcService implements InsertData, AutoCloseable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JdbcService.class);
+    private static final String INSERT_SQL = "insert into audit_data (ip, 
docker_id, thread_id, \r\n"
+            + "      sdk_ts, packet_id, log_ts, \r\n"
+            + "      inlong_group_id, inlong_stream_id, audit_id, audit_tag, 
audit_version, \r\n"
+            + "      count, size, delay, \r\n"
+            + "      update_time)\r\n"
+            + "    values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
+
+    private final JdbcConfig jdbcConfig;
+
+    private final ScheduledExecutorService timerService = 
Executors.newSingleThreadScheduledExecutor();
+    private LinkedBlockingQueue<JdbcDataPo> receiveQueue;
+    private long lastCheckTime = System.currentTimeMillis();
+    private Connection connection;
+    private final List<JdbcDataPo> writeDataList = new LinkedList<>();
+
+    public JdbcService(JdbcConfig jdbcConfig) {
+        this.jdbcConfig = jdbcConfig;
+    }
+
+    /**
+     * start
+     */
+    public void start() {
+        receiveQueue = new 
LinkedBlockingQueue<>(jdbcConfig.getDataQueueSize());
+        try {
+            Class.forName(jdbcConfig.getDriver());
+            reconnect();
+        } catch (Exception e) {
+            LOG.error("Start failure!", e);
+        }
+        timerService.scheduleWithFixedDelay(this::process,
+                jdbcConfig.getProcessIntervalMs(),
+                jdbcConfig.getProcessIntervalMs(), TimeUnit.MILLISECONDS);
+    }
+
+    private void process() {
+        if (receiveQueue.size() < jdbcConfig.getBatchThreshold()
+                && (System.currentTimeMillis() - lastCheckTime < 
jdbcConfig.getBatchIntervalMs())) {
+            return;
+        }
+        lastCheckTime = System.currentTimeMillis();
+
+        if (writeDataList.size() > 0) {
+            if (executeBatch(writeDataList)) {
+                acknowledge(writeDataList);
+                writeDataList.clear();
+            } else {
+                return;
+            }
+        }
+
+        JdbcDataPo data = receiveQueue.poll();
+        while (data != null) {
+            writeDataList.add(data);
+            if (writeDataList.size() > jdbcConfig.getBatchThreshold()) {
+                if (executeBatch(writeDataList)) {
+                    acknowledge(writeDataList);
+                    writeDataList.clear();
+                } else {
+                    break;
+                }
+            }
+            data = receiveQueue.poll();
+        }
+    }
+
+    private boolean executeBatch(List<JdbcDataPo> dataList) {
+        boolean result = false;
+        try (PreparedStatement statement = 
connection.prepareStatement(INSERT_SQL)) {
+            for (JdbcDataPo data : dataList) {
+                statement.setString(1, data.getIp());
+                statement.setString(2, data.getDockerId());
+                statement.setString(3, data.getThreadId());
+                statement.setTimestamp(4, data.getSdkTs());
+                statement.setLong(5, data.getPacketId());
+                statement.setTimestamp(6, data.getLogTs());
+                statement.setString(7, data.getInLongGroupId());
+                statement.setString(8, data.getInLongStreamId());
+                statement.setString(9, data.getAuditId());
+                statement.setString(10, data.getAuditTag());
+                statement.setLong(11, data.getAuditVersion());
+                statement.setLong(12, data.getCount());
+                statement.setLong(13, data.getSize());
+                statement.setLong(14, data.getDelay());
+                statement.setTimestamp(15, data.getUpdateTime());
+                statement.addBatch();
+            }
+            statement.executeBatch();
+            connection.commit();
+            result = true;
+        } catch (Exception exception) {
+            LOG.error("Execute batch has failure!", exception);
+            try {
+                reconnect();
+            } catch (SQLException sqlException) {
+                LOG.error("Re-connect has  failure!", sqlException);
+            }
+        }
+        return result;
+    }
+
+    /**
+     * reconnect
+     *
+     * @throws SQLException Exception when creating connection.
+     */
+    private void reconnect() throws SQLException {
+        if (connection != null) {
+            try {
+                connection.close();
+            } catch (Exception e) {
+                LOG.error("Reconnect has exception!", e);
+            }
+            connection = null;
+        }
+        connection = DriverManager.getConnection(jdbcConfig.getUrl(), 
jdbcConfig.getUserName(),
+                jdbcConfig.getPassword());
+        connection.setAutoCommit(false);
+    }
+
+    /**
+     * insert
+     *
+     * @param msgBody audit data reading from Pulsar or other MessageQueue.
+     */
+    @Override
+    public void insert(AuditData msgBody) {
+    }
+
+    @Override
+    public void insert(AuditData msgBody, Consumer<byte[]> consumer, MessageId 
messageId) {
+        JdbcDataPo data = new JdbcDataPo();
+        data.setConsumer(consumer);
+        data.setMessageId(messageId);
+        data.setIp(msgBody.getIp());
+        data.setThreadId(msgBody.getThreadId());
+        data.setDockerId(msgBody.getDockerId());
+        data.setPacketId(msgBody.getPacketId());
+        data.setSdkTs(new Timestamp(msgBody.getSdkTs()));
+        data.setLogTs(new Timestamp(msgBody.getLogTs()));
+        data.setAuditId(msgBody.getAuditId());
+        data.setAuditTag(msgBody.getAuditTag());
+        data.setAuditVersion(msgBody.getAuditVersion());
+        data.setCount(msgBody.getCount());
+        data.setDelay(msgBody.getDelay());
+        data.setInLongGroupId(msgBody.getInlongGroupId());
+        data.setInLongStreamId(msgBody.getInlongStreamId());
+        data.setSize(msgBody.getSize());
+        data.setUpdateTime(new Timestamp(System.currentTimeMillis()));
+        try {
+            receiveQueue.offer(data, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException exception) {
+            LOG.error("Insert data has InterruptedException ", exception);
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        this.connection.close();
+        this.timerService.shutdown();
+    }
+
+    private void acknowledge(List<JdbcDataPo> dataList) {
+        Iterator<JdbcDataPo> iterator = dataList.iterator();
+        while (iterator.hasNext()) {
+            JdbcDataPo jdbcData = iterator.next();
+            try {
+                if (jdbcData.getConsumer() != null && jdbcData.getMessageId() 
!= null) {
+                    
jdbcData.getConsumer().acknowledge(jdbcData.getMessageId());
+                }
+                iterator.remove();
+            } catch (Exception exception) {
+                LOG.error("Acknowledge has exception!", exception);
+            }
+        }
+    }
+}
diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/MySqlService.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/MySqlService.java
index bedc40cef9..21f471d003 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/MySqlService.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/MySqlService.java
@@ -21,6 +21,9 @@ import org.apache.inlong.audit.db.dao.AuditDataDao;
 import org.apache.inlong.audit.db.entities.AuditDataPo;
 import org.apache.inlong.audit.protocol.AuditData;
 
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+
 import java.util.Date;
 
 /**
@@ -54,4 +57,9 @@ public class MySqlService implements InsertData {
         dao.insert(po);
     }
 
+    @Override
+    public void insert(AuditData msgBody, Consumer<byte[]> consumer, MessageId 
messageId) {
+
+    }
+
 }
diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/BaseConsume.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/BaseConsume.java
index 0f8a5b104a..e0fbb7180a 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/BaseConsume.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/BaseConsume.java
@@ -23,6 +23,8 @@ import org.apache.inlong.audit.protocol.AuditData;
 import org.apache.inlong.audit.service.InsertData;
 
 import com.google.gson.Gson;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -62,5 +64,14 @@ public abstract class BaseConsume {
             }
         });
     }
-
+    protected void handleMessage(String body, Consumer<byte[]> consumer, 
MessageId messageId) {
+        AuditData msgBody = gson.fromJson(body, AuditData.class);
+        this.insertServiceList.forEach((service) -> {
+            try {
+                service.insert(msgBody, consumer, messageId);
+            } catch (Exception e) {
+                LOG.error("Handle message has exception!", e);
+            }
+        });
+    }
 }
diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/PulsarConsume.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/PulsarConsume.java
index c73fc7319b..7d8efc79ce 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/PulsarConsume.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/PulsarConsume.java
@@ -129,8 +129,7 @@ public class PulsarConsume extends BaseConsume {
                             public void received(Consumer<byte[]> consumer, 
Message<byte[]> msg) {
                                 try {
                                     String body = new String(msg.getData(), 
StandardCharsets.UTF_8);
-                                    handleMessage(body);
-                                    consumer.acknowledge(msg);
+                                    handleMessage(body, consumer, 
msg.getMessageId());
                                 } catch (Exception e) {
                                     LOG.error("Consumer has exception topic 
{}, subName {}, ex {}",
                                             topic,
diff --git a/inlong-audit/conf/application.properties 
b/inlong-audit/conf/application.properties
index c76942da90..d721e375cb 100644
--- a/inlong-audit/conf/application.properties
+++ b/inlong-audit/conf/application.properties
@@ -96,3 +96,9 @@ clickhouse.batchIntervalMs=1000
 clickhouse.batchThreshold=500
 clickhouse.processIntervalMs=100
 
+# Generic jdbc storage
+jdbc.driver=com.mysql.cj.jdbc.Driver
+jdbc.url=jdbc:mysql://127.0.0.1:9020/apache_inlong_audit?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2b8&rewriteBatchedStatements=true&allowMultiQueries=true&zeroDateTimeBehavior=CONVERT_TO_NULL
+jdbc.username=*******
+jdbc.password=********
+
diff --git a/inlong-audit/conf/audit-service.properties 
b/inlong-audit/conf/audit-service.properties
index 66fa9e0fd5..4d7c144ff1 100644
--- a/inlong-audit/conf/audit-service.properties
+++ b/inlong-audit/conf/audit-service.properties
@@ -16,25 +16,9 @@
 # specific language governing permissions and limitations
 # under the License.
 
-#  clickhouse config
-clickhouse.jdbc.url=jdbc:clickhouse://*****:***/db_inlong_audit?socket_timeout=600000
-clickhouse.username=*****
-clickhouse.password=*****
-
 #  mysql config
 
mysql.jdbc.url=jdbc:mysql://*****:***/apache_inlong_audit?characterEncoding=utf8&useUnicode=true&rewriteBatchedStatements=true
 mysql.username=*****
 mysql.password=*****
 
-#  summary config
-summary.realtime.stat.back.times=6
-summary.daily.stat.back.times=2
-audit.ids=3;4;5;6
-
-#  api config
-api.cache.max.size=50000000
-api.cache.expired.hours=12
-api.real.limiter.qps=1000.0
-api.pool.size=10
-api.backlog.size=100
 

Reply via email to