This is an automated email from the ASF dual-hosted git repository. luchunliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new d963e5c753 [INLONG-10099][Audit] Audit-store adds the general JDBC sink capability (#10104) d963e5c753 is described below commit d963e5c7537a8685df6812bbcf1b2d3b1b0a41a4 Author: doleyzi <43397300+dole...@users.noreply.github.com> AuthorDate: Mon Apr 29 18:00:19 2024 +0800 [INLONG-10099][Audit] Audit-store adds the general JDBC sink capability (#10104) * Audit-store adds the general JDBC sink capability * Audit-store adds the general JDBC sink capability * Modify configuration file of JDBC * Modify configuration file of JDBC * Modify configuration file of JDBC * Adjust the import order --- inlong-audit/audit-docker/Dockerfile | 7 +- inlong-audit/audit-docker/audit-docker.sh | 26 ++- .../config/{StoreConfig.java => JdbcConfig.java} | 47 +++-- .../apache/inlong/audit/config/StoreConfig.java | 3 + .../entities/JdbcDataPo.java} | 55 ++--- .../audit/service/AuditMsgConsumerServer.java | 18 +- .../inlong/audit/service/ClickHouseService.java | 7 + .../inlong/audit/service/ElasticsearchService.java | 7 + .../apache/inlong/audit/service/InsertData.java | 5 + .../apache/inlong/audit/service/JdbcService.java | 224 +++++++++++++++++++++ .../apache/inlong/audit/service/MySqlService.java | 8 + .../inlong/audit/service/consume/BaseConsume.java | 13 +- .../audit/service/consume/PulsarConsume.java | 3 +- inlong-audit/conf/application.properties | 6 + inlong-audit/conf/audit-service.properties | 16 -- 15 files changed, 366 insertions(+), 79 deletions(-) diff --git a/inlong-audit/audit-docker/Dockerfile b/inlong-audit/audit-docker/Dockerfile index d55ac7e2f6..6717eca684 100644 --- a/inlong-audit/audit-docker/Dockerfile +++ b/inlong-audit/audit-docker/Dockerfile @@ -35,7 +35,7 @@ ENV TUBE_AUDIT_TOPIC="inlong-audit" ENV AUDIT_DBNAME="apache_inlong_audit" # proxy/store/all, start audit module individually, or all ENV START_MODE="all" -# mysql / clickhouse / elasticsearch +# mysql / clickhouse / elasticsearch / starrocks ENV STORE_MODE=mysql # mysql ENV JDBC_URL=127.0.0.1:3306 @@ -52,6 +52,11 @@ ENV STORE_ES_PORT=9200 ENV STORE_ES_AUTHENABLE=false ENV STORE_ES_USERNAME=elastic ENV STORE_ES_PASSWD=inlong +# starrocks +ENV STORE_SR_URL=127.0.0.1:9030 +ENV STORE_SR_USERNAME=default +ENV STORE_SR_PASSWD=default +ENV STORE_SR_DBNAME="apache_inlong_audit" # jvm ENV AUDIT_JVM_HEAP_OPTS="-XX:+UseContainerSupport -XX:InitialRAMPercentage=40.0 -XX:MaxRAMPercentage=80.0 -XX:-UseAdaptiveSizePolicy" WORKDIR /opt/inlong-audit diff --git a/inlong-audit/audit-docker/audit-docker.sh b/inlong-audit/audit-docker/audit-docker.sh index 42c43c9322..777a80b801 100755 --- a/inlong-audit/audit-docker/audit-docker.sh +++ b/inlong-audit/audit-docker/audit-docker.sh @@ -21,8 +21,12 @@ file_path=$(cd "$(dirname "$0")"/../;pwd) store_conf_file=${file_path}/conf/application.properties # proxy config proxy_conf_file=${file_path}/conf/audit-proxy-${MQ_TYPE}.conf -sql_file="${file_path}"/sql/apache_inlong_audit.sql +sql_mysql_file="${file_path}"/sql/apache_inlong_audit_mysql.sql sql_ck_file="${file_path}"/sql/apache_inlong_audit_clickhouse.sql +sql_sr_file="${file_path}"/sql/apache_inlong_audit_starrocks.sql + +# audit-service config +service_conf_file=${file_path}/conf/audit-service.properties # replace the configuration for audit proxy sed -i "s/manager.hosts=.*$/manager.hosts=${MANAGER_OPENAPI_IP}:${MANAGER_OPENAPI_PORT}/g" "${store_conf_file}" @@ -52,7 +56,7 @@ sed -i "s/127.0.0.1:3306\/apache_inlong_audit/${JDBC_URL}\/${AUDIT_DBNAME}/g" "$ sed -i "s/spring.datasource.druid.username=.*$/spring.datasource.druid.username=${USERNAME}/g" "${store_conf_file}" sed -i "s/spring.datasource.druid.password=.*$/spring.datasource.druid.password=${PASSWORD}/g" "${store_conf_file}" # mysql file for audit -sed -i "s/apache_inlong_audit/${AUDIT_DBNAME}/g" "${sql_file}" +sed -i "s/apache_inlong_audit/${AUDIT_DBNAME}/g" "${sql_mysql_file}" # clickhouse sed -i "s/clickhouse.url=.*$/clickhouse.url=jdbc:clickhouse:\/\/${STORE_CK_URL}\/${STORE_CK_DBNAME}/g" "${store_conf_file}" sed -i "s/clickhouse.username=.*$/clickhouse.username=${STORE_CK_USERNAME}/g" "${store_conf_file}" @@ -66,6 +70,18 @@ sed -i "s/elasticsearch.authEnable=.*$/elasticsearch.authEnable=${STORE_ES_AUTHE sed -i "s/elasticsearch.username=.*$/elasticsearch.username=${STORE_ES_USERNAME}/g" "${store_conf_file}" sed -i "s/elasticsearch.password=.*$/elasticsearch.password=${STORE_ES_PASSWD}/g" "${store_conf_file}" +# StarRocks SQL file for audit +sed -i "s/apache_inlong_audit/${AUDIT_DBNAME}/g" "${sql_sr_file}" +# StarRocks +sed -i "s/jdbc.url=.*$/jdbc.url=jdbc:mysql:\/\/${STORE_SR_URL}\/${STORE_SR_DBNAME}/g" "${store_conf_file}" +sed -i "s/jdbc.username=.*$/jdbc.username=${STORE_SR_USERNAME}/g" "${store_conf_file}" +sed -i "s/jdbc.password=.*$/jdbc.password=${STORE_SR_PASSWD}/g" "${store_conf_file}" + +# audit-service config +sed -i "s/mysql.jdbc.url=.*$/mysql.jdbc.url=jdbc:mysql:\/\/${JDBC_URL}\/${AUDIT_DBNAME}/g" "${service_conf_file}" +sed -i "s/mysql.jdbc.username=.*$/mysql.jdbc.username=${USERNAME}/g" "${service_conf_file}" +sed -i "s/mysql.jdbc.password=.*$/mysql.jdbc.password=${PASSWORD}/g" "${service_conf_file}" + # Whether the database table exists. If it does not exist, initialize the database and skip if it exists. if [[ "${JDBC_URL}" =~ (.+):([0-9]+) ]]; then datasource_hostname=${BASH_REMATCH[1]} @@ -96,6 +112,12 @@ fi if [ "${START_MODE}" = "all" ] || [ "${START_MODE}" = "store" ]; then bash +x ./bin/store-start.sh fi + +# start service +if [ "${START_MODE}" = "all" ] || [ "${START_MODE}" = "service" ]; then + bash +x ./bin/service-start.sh +fi + sleep 3 # keep alive tail -F ./logs/info.log diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/JdbcConfig.java similarity index 57% copy from inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java copy to inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/JdbcConfig.java index 41ab00da8b..e7f3f639cd 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/JdbcConfig.java @@ -17,29 +17,28 @@ package org.apache.inlong.audit.config; -import lombok.Getter; -import lombok.Setter; +import lombok.Data; import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Component; - -@Component -@Getter -@Setter -public class StoreConfig { - - @Value("${audit.config.store.mode:mysql}") - private String store; - - public boolean isMysqlStore() { - return store.contains("mysql"); - } - - public boolean isElasticsearchStore() { - return store.contains("elasticsearch"); - } - - public boolean isClickHouseStore() { - return store.contains("clickhouse"); - } - +import org.springframework.context.annotation.Configuration; + +@Configuration +@Data +public class JdbcConfig { + + @Value("${jdbc.driver:com.mysql.cj.jdbc.Driver}") + private String driver; + @Value("${jdbc.url}") + private String url; + @Value("${jdbc.username}") + private String userName; + @Value("${jdbc.password}") + private String password; + @Value("${jdbc.batchIntervalMs:1000}") + private int batchIntervalMs; + @Value("${jdbc.batchThreshold:500}") + private int batchThreshold; + @Value("${jdbc.processIntervalMs:100}") + private int processIntervalMs; + @Value("${data.queue.size:1000000}") + private int dataQueueSize; } diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java index 41ab00da8b..c4a2db3328 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java @@ -42,4 +42,7 @@ public class StoreConfig { return store.contains("clickhouse"); } + public boolean isJdbc() { + return store.contains("jdbc"); + } } diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/JdbcDataPo.java similarity index 52% copy from inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java copy to inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/JdbcDataPo.java index 41ab00da8b..ebc42f4a53 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/JdbcDataPo.java @@ -15,31 +15,32 @@ * limitations under the License. */ -package org.apache.inlong.audit.config; - -import lombok.Getter; -import lombok.Setter; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Component; - -@Component -@Getter -@Setter -public class StoreConfig { - - @Value("${audit.config.store.mode:mysql}") - private String store; - - public boolean isMysqlStore() { - return store.contains("mysql"); - } - - public boolean isElasticsearchStore() { - return store.contains("elasticsearch"); - } - - public boolean isClickHouseStore() { - return store.contains("clickhouse"); - } - +package org.apache.inlong.audit.db.entities; + +import lombok.Data; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.MessageId; + +import java.sql.Timestamp; + +@Data +public class JdbcDataPo { + + private String ip; + private String dockerId; + private String threadId; + private Timestamp sdkTs; + private Long packetId; + private Timestamp logTs; + private String inLongGroupId; + private String inLongStreamId; + private String auditId; + private String auditTag; + private long auditVersion; + private Long count; + private Long size; + private Long delay; + private Timestamp updateTime; + private Consumer<byte[]> consumer; + private MessageId messageId; } diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java index 04796a3b4b..2e2f4d9469 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java @@ -18,6 +18,7 @@ package org.apache.inlong.audit.service; import org.apache.inlong.audit.config.ClickHouseConfig; +import org.apache.inlong.audit.config.JdbcConfig; import org.apache.inlong.audit.config.MessageQueueConfig; import org.apache.inlong.audit.config.StoreConfig; import org.apache.inlong.audit.consts.ConfigConstants; @@ -68,16 +69,14 @@ public class AuditMsgConsumerServer implements InitializingBean { private ClickHouseConfig chConfig; // ClickHouseService private ClickHouseService ckService; - + @Autowired + private JdbcConfig jdbcConfig; + private JdbcService jdbcService; private static final String DEFAULT_CONFIG_PROPERTIES = "application.properties"; - // interval time of getting mq config private static final int INTERVAL_MS = 5000; - private final CloseableHttpClient httpClient = HttpClientBuilder.create().build(); - private final Gson gson = new Gson(); - /** * Initializing bean */ @@ -105,13 +104,15 @@ public class AuditMsgConsumerServer implements InitializingBean { if (mqConsume == null) { LOG.error("Unknown MessageQueue {}", mqConfig.getMqType()); } - if (storeConfig.isElasticsearchStore()) { esService.startTimerRoutine(); } if (storeConfig.isClickHouseStore()) { ckService.start(); } + if (storeConfig.isJdbc()) { + jdbcService.start(); + } mqConsume.start(); } @@ -133,6 +134,11 @@ public class AuditMsgConsumerServer implements InitializingBean { ckService = new ClickHouseService(chConfig); insertServiceList.add(ckService); } + if (storeConfig.isJdbc()) { + // create jdbc object + jdbcService = new JdbcService(jdbcConfig); + insertServiceList.add(jdbcService); + } return insertServiceList; } diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java index ad257ba372..47c63aa395 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java @@ -21,6 +21,8 @@ import org.apache.inlong.audit.config.ClickHouseConfig; import org.apache.inlong.audit.db.entities.ClickHouseDataPo; import org.apache.inlong.audit.protocol.AuditData; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.MessageId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -197,6 +199,11 @@ public class ClickHouseService implements InsertData, AutoCloseable { } } + @Override + public void insert(AuditData msgBody, Consumer<byte[]> consumer, MessageId messageId) { + + } + /** * close * diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ElasticsearchService.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ElasticsearchService.java index 02a23c3a8c..3de234146f 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ElasticsearchService.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ElasticsearchService.java @@ -24,6 +24,8 @@ import org.apache.inlong.audit.protocol.AuditData; import com.google.gson.FieldNamingPolicy; import com.google.gson.Gson; import com.google.gson.GsonBuilder; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.MessageId; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; @@ -278,4 +280,9 @@ public class ElasticsearchService implements InsertData, AutoCloseable { esPo.setPacketId(msgBody.getPacketId()); this.insertData(esPo); } + + @Override + public void insert(AuditData msgBody, Consumer<byte[]> consumer, MessageId messageId) { + + } } diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/InsertData.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/InsertData.java index 478920e3a2..7f9bcf8207 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/InsertData.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/InsertData.java @@ -19,6 +19,9 @@ package org.apache.inlong.audit.service; import org.apache.inlong.audit.protocol.AuditData; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.MessageId; + /** * Insert Data interface */ @@ -28,4 +31,6 @@ public interface InsertData { * insert audit data to storage. */ void insert(AuditData msgBody); + + void insert(AuditData msgBody, Consumer<byte[]> consumer, MessageId messageId); } diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/JdbcService.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/JdbcService.java new file mode 100644 index 0000000000..7aaac09e64 --- /dev/null +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/JdbcService.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.audit.service; + +import org.apache.inlong.audit.config.JdbcConfig; +import org.apache.inlong.audit.db.entities.JdbcDataPo; +import org.apache.inlong.audit.protocol.AuditData; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.MessageId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * This is a general jdbc sink service. As long as it meets the jdbc protocol, you can use this service. + */ +public class JdbcService implements InsertData, AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(JdbcService.class); + private static final String INSERT_SQL = "insert into audit_data (ip, docker_id, thread_id, \r\n" + + " sdk_ts, packet_id, log_ts, \r\n" + + " inlong_group_id, inlong_stream_id, audit_id, audit_tag, audit_version, \r\n" + + " count, size, delay, \r\n" + + " update_time)\r\n" + + " values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; + + private final JdbcConfig jdbcConfig; + + private final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor(); + private LinkedBlockingQueue<JdbcDataPo> receiveQueue; + private long lastCheckTime = System.currentTimeMillis(); + private Connection connection; + private final List<JdbcDataPo> writeDataList = new LinkedList<>(); + + public JdbcService(JdbcConfig jdbcConfig) { + this.jdbcConfig = jdbcConfig; + } + + /** + * start + */ + public void start() { + receiveQueue = new LinkedBlockingQueue<>(jdbcConfig.getDataQueueSize()); + try { + Class.forName(jdbcConfig.getDriver()); + reconnect(); + } catch (Exception e) { + LOG.error("Start failure!", e); + } + timerService.scheduleWithFixedDelay(this::process, + jdbcConfig.getProcessIntervalMs(), + jdbcConfig.getProcessIntervalMs(), TimeUnit.MILLISECONDS); + } + + private void process() { + if (receiveQueue.size() < jdbcConfig.getBatchThreshold() + && (System.currentTimeMillis() - lastCheckTime < jdbcConfig.getBatchIntervalMs())) { + return; + } + lastCheckTime = System.currentTimeMillis(); + + if (writeDataList.size() > 0) { + if (executeBatch(writeDataList)) { + acknowledge(writeDataList); + writeDataList.clear(); + } else { + return; + } + } + + JdbcDataPo data = receiveQueue.poll(); + while (data != null) { + writeDataList.add(data); + if (writeDataList.size() > jdbcConfig.getBatchThreshold()) { + if (executeBatch(writeDataList)) { + acknowledge(writeDataList); + writeDataList.clear(); + } else { + break; + } + } + data = receiveQueue.poll(); + } + } + + private boolean executeBatch(List<JdbcDataPo> dataList) { + boolean result = false; + try (PreparedStatement statement = connection.prepareStatement(INSERT_SQL)) { + for (JdbcDataPo data : dataList) { + statement.setString(1, data.getIp()); + statement.setString(2, data.getDockerId()); + statement.setString(3, data.getThreadId()); + statement.setTimestamp(4, data.getSdkTs()); + statement.setLong(5, data.getPacketId()); + statement.setTimestamp(6, data.getLogTs()); + statement.setString(7, data.getInLongGroupId()); + statement.setString(8, data.getInLongStreamId()); + statement.setString(9, data.getAuditId()); + statement.setString(10, data.getAuditTag()); + statement.setLong(11, data.getAuditVersion()); + statement.setLong(12, data.getCount()); + statement.setLong(13, data.getSize()); + statement.setLong(14, data.getDelay()); + statement.setTimestamp(15, data.getUpdateTime()); + statement.addBatch(); + } + statement.executeBatch(); + connection.commit(); + result = true; + } catch (Exception exception) { + LOG.error("Execute batch has failure!", exception); + try { + reconnect(); + } catch (SQLException sqlException) { + LOG.error("Re-connect has failure!", sqlException); + } + } + return result; + } + + /** + * reconnect + * + * @throws SQLException Exception when creating connection. + */ + private void reconnect() throws SQLException { + if (connection != null) { + try { + connection.close(); + } catch (Exception e) { + LOG.error("Reconnect has exception!", e); + } + connection = null; + } + connection = DriverManager.getConnection(jdbcConfig.getUrl(), jdbcConfig.getUserName(), + jdbcConfig.getPassword()); + connection.setAutoCommit(false); + } + + /** + * insert + * + * @param msgBody audit data reading from Pulsar or other MessageQueue. + */ + @Override + public void insert(AuditData msgBody) { + } + + @Override + public void insert(AuditData msgBody, Consumer<byte[]> consumer, MessageId messageId) { + JdbcDataPo data = new JdbcDataPo(); + data.setConsumer(consumer); + data.setMessageId(messageId); + data.setIp(msgBody.getIp()); + data.setThreadId(msgBody.getThreadId()); + data.setDockerId(msgBody.getDockerId()); + data.setPacketId(msgBody.getPacketId()); + data.setSdkTs(new Timestamp(msgBody.getSdkTs())); + data.setLogTs(new Timestamp(msgBody.getLogTs())); + data.setAuditId(msgBody.getAuditId()); + data.setAuditTag(msgBody.getAuditTag()); + data.setAuditVersion(msgBody.getAuditVersion()); + data.setCount(msgBody.getCount()); + data.setDelay(msgBody.getDelay()); + data.setInLongGroupId(msgBody.getInlongGroupId()); + data.setInLongStreamId(msgBody.getInlongStreamId()); + data.setSize(msgBody.getSize()); + data.setUpdateTime(new Timestamp(System.currentTimeMillis())); + try { + receiveQueue.offer(data, Long.MAX_VALUE, TimeUnit.MILLISECONDS); + } catch (InterruptedException exception) { + LOG.error("Insert data has InterruptedException ", exception); + } + } + + @Override + public void close() throws Exception { + this.connection.close(); + this.timerService.shutdown(); + } + + private void acknowledge(List<JdbcDataPo> dataList) { + Iterator<JdbcDataPo> iterator = dataList.iterator(); + while (iterator.hasNext()) { + JdbcDataPo jdbcData = iterator.next(); + try { + if (jdbcData.getConsumer() != null && jdbcData.getMessageId() != null) { + jdbcData.getConsumer().acknowledge(jdbcData.getMessageId()); + } + iterator.remove(); + } catch (Exception exception) { + LOG.error("Acknowledge has exception!", exception); + } + } + } +} diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/MySqlService.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/MySqlService.java index bedc40cef9..21f471d003 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/MySqlService.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/MySqlService.java @@ -21,6 +21,9 @@ import org.apache.inlong.audit.db.dao.AuditDataDao; import org.apache.inlong.audit.db.entities.AuditDataPo; import org.apache.inlong.audit.protocol.AuditData; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.MessageId; + import java.util.Date; /** @@ -54,4 +57,9 @@ public class MySqlService implements InsertData { dao.insert(po); } + @Override + public void insert(AuditData msgBody, Consumer<byte[]> consumer, MessageId messageId) { + + } + } diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/BaseConsume.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/BaseConsume.java index 0f8a5b104a..e0fbb7180a 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/BaseConsume.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/BaseConsume.java @@ -23,6 +23,8 @@ import org.apache.inlong.audit.protocol.AuditData; import org.apache.inlong.audit.service.InsertData; import com.google.gson.Gson; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.MessageId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,5 +64,14 @@ public abstract class BaseConsume { } }); } - + protected void handleMessage(String body, Consumer<byte[]> consumer, MessageId messageId) { + AuditData msgBody = gson.fromJson(body, AuditData.class); + this.insertServiceList.forEach((service) -> { + try { + service.insert(msgBody, consumer, messageId); + } catch (Exception e) { + LOG.error("Handle message has exception!", e); + } + }); + } } diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/PulsarConsume.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/PulsarConsume.java index c73fc7319b..7d8efc79ce 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/PulsarConsume.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/PulsarConsume.java @@ -129,8 +129,7 @@ public class PulsarConsume extends BaseConsume { public void received(Consumer<byte[]> consumer, Message<byte[]> msg) { try { String body = new String(msg.getData(), StandardCharsets.UTF_8); - handleMessage(body); - consumer.acknowledge(msg); + handleMessage(body, consumer, msg.getMessageId()); } catch (Exception e) { LOG.error("Consumer has exception topic {}, subName {}, ex {}", topic, diff --git a/inlong-audit/conf/application.properties b/inlong-audit/conf/application.properties index c76942da90..d721e375cb 100644 --- a/inlong-audit/conf/application.properties +++ b/inlong-audit/conf/application.properties @@ -96,3 +96,9 @@ clickhouse.batchIntervalMs=1000 clickhouse.batchThreshold=500 clickhouse.processIntervalMs=100 +# Generic jdbc storage +jdbc.driver=com.mysql.cj.jdbc.Driver +jdbc.url=jdbc:mysql://127.0.0.1:9020/apache_inlong_audit?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2b8&rewriteBatchedStatements=true&allowMultiQueries=true&zeroDateTimeBehavior=CONVERT_TO_NULL +jdbc.username=******* +jdbc.password=******** + diff --git a/inlong-audit/conf/audit-service.properties b/inlong-audit/conf/audit-service.properties index 66fa9e0fd5..4d7c144ff1 100644 --- a/inlong-audit/conf/audit-service.properties +++ b/inlong-audit/conf/audit-service.properties @@ -16,25 +16,9 @@ # specific language governing permissions and limitations # under the License. -# clickhouse config -clickhouse.jdbc.url=jdbc:clickhouse://*****:***/db_inlong_audit?socket_timeout=600000 -clickhouse.username=***** -clickhouse.password=***** - # mysql config mysql.jdbc.url=jdbc:mysql://*****:***/apache_inlong_audit?characterEncoding=utf8&useUnicode=true&rewriteBatchedStatements=true mysql.username=***** mysql.password=***** -# summary config -summary.realtime.stat.back.times=6 -summary.daily.stat.back.times=2 -audit.ids=3;4;5;6 - -# api config -api.cache.max.size=50000000 -api.cache.expired.hours=12 -api.real.limiter.qps=1000.0 -api.pool.size=10 -api.backlog.size=100