This is an automated email from the ASF dual-hosted git repository. luchunliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 50539a5660 [INLONG-9943][Audit] Audit-service add codes of jdbc sink (#9949) 50539a5660 is described below commit 50539a566013f49a70499b57750e3fc27a099ebc Author: doleyzi <43397300+dole...@users.noreply.github.com> AuthorDate: Tue Apr 9 19:12:45 2024 +0800 [INLONG-9943][Audit] Audit-service add codes of jdbc sink (#9949) * Audit-service add codes of jdbc sink for aggregate the data from the data source and store the aggregated data to the target storage * Use lombok.AllArgsConstructor annotation to structure SinkConfig --- .../inlong/audit/config/ConfigConstants.java | 12 +- .../apache/inlong/audit/config/SqlConstants.java | 10 ++ .../apache/inlong/audit/entities/SinkConfig.java | 36 ++++ .../apache/inlong/audit/service/EtlService.java | 196 +++++++++++++++++++++ .../org/apache/inlong/audit/sink/JdbcSink.java | 164 +++++++++++++++++ 5 files changed, 412 insertions(+), 6 deletions(-) diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java index bc4f0f9326..ecf64338e7 100644 --- a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java @@ -25,14 +25,14 @@ public class ConfigConstants { // Source config public static final String KEY_CLICKHOUSE_DRIVER = "clickhouse.driver"; public static final String DEFAULT_CLICKHOUSE_DRIVER = "ru.yandex.clickhouse.ClickHouseDriver"; - public static final String KEY_CLICKHOUSE_URL = "clickhouse.url"; + public static final String KEY_CLICKHOUSE_JDBC_URL = "clickhouse.jdbc.url"; public static final String KEY_CLICKHOUSE_USERNAME = "clickhouse.username"; public static final String KEY_CLICKHOUSE_PASSWORD = "clickhouse.password"; // DB config public static final String KEY_MYSQL_DRIVER = "mysql.driver"; public static final String KEY_DEFAULT_MYSQL_DRIVER = "com.mysql.cj.jdbc.Driver"; - public static final String KEY_MYSQL_URL = "mysql.url"; + public static final String KEY_MYSQL_JDBC_URL = "mysql.jdbc.url"; public static final String KEY_MYSQL_USERNAME = "mysql.username"; public static final String KEY_MYSQL_PASSWORD = "mysql.password"; @@ -91,15 +91,15 @@ public class ConfigConstants { public static final String DEFAULT_REALTIME_SUMMARY_SOURCE_TABLE = "audit_data"; public static final String KEY_REALTIME_SUMMARY_SINK_TABLE = "realtime.summary.sink.table"; public static final String DEFAULT_REALTIME_SUMMARY_SINK_TABLE = "audit_data_temp"; - public static final String KEY_REALTIME_SUMMARY_BEFORE_TIMES = "realtime.summary.before.times"; - public static final int DEFAULT_REALTIME_SUMMARY_BEFORE_TIMES = 6; + public static final String KEY_REALTIME_SUMMARY_STAT_BACK_TIMES = "realtime.summary.stat.back.times"; + public static final int DEFAULT_REALTIME_SUMMARY_STAT_BACK_TIMES = 6; public static final String KEY_DAILY_SUMMARY_SOURCE_TABLE = "daily.summary.source.table"; public static final String DEFAULT_DAILY_SUMMARY_SOURCE_TABLE = "audit_data_temp"; public static final String KEY_DAILY_SUMMARY_SINK_TABLE = "daily.summary.sink.table"; public static final String DEFAULT_DAILY_SUMMARY_SINK_TABLE = "audit_data_day"; - public static final String KEY_DAILY_SUMMARY_BEFORE_TIMES = "daily.summary.before.times"; - public static final int DEFAULT_DAILY_SUMMARY_BEFORE_TIMES = 2; + public static final String KEY_DAILY_SUMMARY_STAT_BACK_TIMES = "daily.summary.stat.back.times"; + public static final int DEFAULT_DAILY_SUMMARY_STAT_BACK_TIMES = 2; // HA selector config public static final String KEY_RELEASE_LEADER_INTERVAL = "release.leader.interval"; diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java index a64cd2234d..1eee2f7ec9 100644 --- a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java @@ -77,4 +77,14 @@ public class SqlConstants { "AND audit_id = ? \n" + "GROUP BY inlong_group_id, inlong_stream_id, audit_id, audit_tag"; + // Mysql insert sql + public static final String KEY_MYSQL_SINK_INSERT_DAY_SQL = "mysql.sink.insert.day.sql"; + public static final String DEFAULT_MYSQL_SINK_INSERT_DAY_SQL = + "replace into audit_data_day (log_ts,inlong_group_id, inlong_stream_id, audit_id,audit_tag,count, size, delay) " + + " values (?,?,?,?,?,?,?,?)"; + public static final String KEY_MYSQL_SINK_INSERT_TEMP_SQL = "mysql.sink.insert.temp.sql"; + public static final String DEFAULT_MYSQL_SINK_INSERT_TEMP_SQL = + "replace into audit_data_temp (log_ts,inlong_group_id, inlong_stream_id, audit_id,audit_tag,count, size, delay) " + + " values (?,?,?,?,?,?,?,?)"; + } diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/SinkConfig.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/SinkConfig.java new file mode 100644 index 0000000000..d2e137ec83 --- /dev/null +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/SinkConfig.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.audit.entities; + +import lombok.AllArgsConstructor; +import lombok.Data; + +/** + * Source config + */ +@Data +@AllArgsConstructor +public class SinkConfig { + + private String insertSql; + private final String driverClassName; + private final String jdbcUrl; + private final String username; + private final String password; + +} diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/EtlService.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/EtlService.java new file mode 100644 index 0000000000..dada6cb7f2 --- /dev/null +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/EtlService.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.audit.service; + +import org.apache.inlong.audit.channel.DataQueue; +import org.apache.inlong.audit.config.Configuration; +import org.apache.inlong.audit.entities.AuditCycle; +import org.apache.inlong.audit.entities.SinkConfig; +import org.apache.inlong.audit.entities.SourceConfig; +import org.apache.inlong.audit.sink.JdbcSink; +import org.apache.inlong.audit.source.JdbcSource; + +import java.util.Objects; + +import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CLICKHOUSE_DRIVER; +import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DAILY_SUMMARY_STAT_BACK_TIMES; +import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATA_QUEUE_SIZE; +import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_REALTIME_SUMMARY_STAT_BACK_TIMES; +import static org.apache.inlong.audit.config.ConfigConstants.KEY_CLICKHOUSE_DRIVER; +import static org.apache.inlong.audit.config.ConfigConstants.KEY_CLICKHOUSE_JDBC_URL; +import static org.apache.inlong.audit.config.ConfigConstants.KEY_CLICKHOUSE_PASSWORD; +import static org.apache.inlong.audit.config.ConfigConstants.KEY_CLICKHOUSE_USERNAME; +import static org.apache.inlong.audit.config.ConfigConstants.KEY_DAILY_SUMMARY_STAT_BACK_TIMES; +import static org.apache.inlong.audit.config.ConfigConstants.KEY_DATA_QUEUE_SIZE; +import static org.apache.inlong.audit.config.ConfigConstants.KEY_DEFAULT_MYSQL_DRIVER; +import static org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_DRIVER; +import static org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_JDBC_URL; +import static org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_PASSWORD; +import static org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_USERNAME; +import static org.apache.inlong.audit.config.ConfigConstants.KEY_REALTIME_SUMMARY_STAT_BACK_TIMES; +import static org.apache.inlong.audit.config.SqlConstants.DEFAULT_CLICKHOUSE_SOURCE_QUERY_SQL; +import static org.apache.inlong.audit.config.SqlConstants.DEFAULT_MYSQL_SINK_INSERT_DAY_SQL; +import static org.apache.inlong.audit.config.SqlConstants.DEFAULT_MYSQL_SINK_INSERT_TEMP_SQL; +import static org.apache.inlong.audit.config.SqlConstants.DEFAULT_MYSQL_SOURCE_QUERY_TEMP_SQL; +import static org.apache.inlong.audit.config.SqlConstants.KEY_CLICKHOUSE_SOURCE_QUERY_SQL; +import static org.apache.inlong.audit.config.SqlConstants.KEY_MYSQL_SINK_INSERT_DAY_SQL; +import static org.apache.inlong.audit.config.SqlConstants.KEY_MYSQL_SINK_INSERT_TEMP_SQL; +import static org.apache.inlong.audit.config.SqlConstants.KEY_MYSQL_SOURCE_QUERY_TEMP_SQL; + +/** + * Etl service aggregate the data from the data source and store the aggregated data to the target storage. + */ +public class EtlService { + + private JdbcSource mysqlSourceOfTemp; + private JdbcSink mysqlSinkOfDay; + private JdbcSource clickhouseSource; + private JdbcSink mysqlSinkOfTemp; + private final int queueSize; + + public EtlService() { + queueSize = Configuration.getInstance().get(KEY_DATA_QUEUE_SIZE, + DEFAULT_DATA_QUEUE_SIZE); + } + + /** + * Start the etl service. + */ + public void start() { + clickhouseToMysql(); + mysqlToMysqlOfDay(); + } + + /** + * Aggregate data from mysql data source and store the aggregated data in the target mysql table. + * The audit data cycle is days,and stored in table of day. + */ + private void mysqlToMysqlOfDay() { + DataQueue dataQueue = new DataQueue(queueSize); + + mysqlSourceOfTemp = new JdbcSource(dataQueue, buildMysqlSourceConfig()); + mysqlSourceOfTemp.start(); + + SinkConfig sinkConfig = buildMysqlSinkConfig(Configuration.getInstance().get(KEY_MYSQL_SINK_INSERT_DAY_SQL, + DEFAULT_MYSQL_SINK_INSERT_DAY_SQL)); + mysqlSinkOfDay = new JdbcSink(dataQueue, sinkConfig); + mysqlSinkOfDay.start(); + } + + /** + * Aggregate data from clickhouse data source and store the aggregated data in the target mysql table. + * The default audit data cycle is 5 minutes,and stored in a temporary table. + */ + private void clickhouseToMysql() { + DataQueue dataQueue = new DataQueue(queueSize); + + clickhouseSource = new JdbcSource(dataQueue, buildClickhouseSourceConfig()); + clickhouseSource.start(); + + SinkConfig sinkConfig = buildMysqlSinkConfig(Configuration.getInstance().get(KEY_MYSQL_SINK_INSERT_TEMP_SQL, + DEFAULT_MYSQL_SINK_INSERT_TEMP_SQL)); + mysqlSinkOfTemp = new JdbcSink(dataQueue, sinkConfig); + mysqlSinkOfTemp.start(); + } + + /** + * Build the configurations of mysql sink. + * + * @param insertSql + * @return + */ + private SinkConfig buildMysqlSinkConfig(String insertSql) { + String driver = Configuration.getInstance().get(KEY_MYSQL_DRIVER, KEY_DEFAULT_MYSQL_DRIVER); + String jdbcUrl = Configuration.getInstance().get(KEY_MYSQL_JDBC_URL); + String userName = Configuration.getInstance().get(KEY_MYSQL_USERNAME); + String passWord = Configuration.getInstance().get(KEY_MYSQL_PASSWORD); + assert (Objects.nonNull(driver) + && Objects.nonNull(jdbcUrl) + && Objects.nonNull(userName) + && Objects.nonNull(passWord)); + + return new SinkConfig( + insertSql, + driver, + jdbcUrl, + userName, + passWord); + } + + /** + * Build the configurations of mysql source. + * + * @return + */ + private SourceConfig buildMysqlSourceConfig() { + String driver = Configuration.getInstance().get(KEY_MYSQL_DRIVER, KEY_DEFAULT_MYSQL_DRIVER); + String jdbcUrl = Configuration.getInstance().get(KEY_MYSQL_JDBC_URL); + String userName = Configuration.getInstance().get(KEY_MYSQL_USERNAME); + String passWord = Configuration.getInstance().get(KEY_MYSQL_PASSWORD); + assert (Objects.nonNull(driver) + && Objects.nonNull(jdbcUrl) + && Objects.nonNull(userName)); + + return new SourceConfig(AuditCycle.DAY, + Configuration.getInstance().get(KEY_MYSQL_SOURCE_QUERY_TEMP_SQL, + DEFAULT_MYSQL_SOURCE_QUERY_TEMP_SQL), + Configuration.getInstance().get(KEY_DAILY_SUMMARY_STAT_BACK_TIMES, + DEFAULT_DAILY_SUMMARY_STAT_BACK_TIMES), + driver, + jdbcUrl, + userName, + passWord); + } + + /** + * Build the configurations of clickhouse source. + * + * @return + */ + private SourceConfig buildClickhouseSourceConfig() { + String driver = Configuration.getInstance().get(KEY_CLICKHOUSE_DRIVER, DEFAULT_CLICKHOUSE_DRIVER); + String jdbcUrl = Configuration.getInstance().get(KEY_CLICKHOUSE_JDBC_URL); + String userName = Configuration.getInstance().get(KEY_CLICKHOUSE_USERNAME); + String passWord = Configuration.getInstance().get(KEY_CLICKHOUSE_PASSWORD); + assert (Objects.nonNull(driver) + && Objects.nonNull(jdbcUrl) + && Objects.nonNull(userName) + && Objects.nonNull(passWord)); + + return new SourceConfig(AuditCycle.MINUTE_5, + Configuration.getInstance().get(KEY_CLICKHOUSE_SOURCE_QUERY_SQL, + DEFAULT_CLICKHOUSE_SOURCE_QUERY_SQL), + Configuration.getInstance().get(KEY_REALTIME_SUMMARY_STAT_BACK_TIMES, + DEFAULT_REALTIME_SUMMARY_STAT_BACK_TIMES), + driver, + jdbcUrl, + userName, + passWord); + } + + /** + * Stop the etl service,and destroy related resources. + */ + public void stop() { + mysqlSourceOfTemp.destroy(); + mysqlSinkOfDay.destroy(); + + clickhouseSource.destroy(); + mysqlSinkOfTemp.destroy(); + } +} diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java new file mode 100644 index 0000000000..99a6709826 --- /dev/null +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.audit.sink; + +import org.apache.inlong.audit.channel.DataQueue; +import org.apache.inlong.audit.config.Configuration; +import org.apache.inlong.audit.entities.SinkConfig; +import org.apache.inlong.audit.entities.StatData; + +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.sql.DataSource; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.apache.inlong.audit.config.ConfigConstants.CACHE_PREP_STMTS; +import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CACHE_PREP_STMTS; +import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CONNECTION_TIMEOUT; +import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_POOL_SIZE; +import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SIZE; +import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SQL_LIMIT; +import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_QUEUE_PULL_TIMEOUT; +import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SOURCE_DB_SINK_BATCH; +import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SOURCE_DB_SINK_INTERVAL; +import static org.apache.inlong.audit.config.ConfigConstants.KEY_CACHE_PREP_STMTS; +import static org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_CONNECTION_TIMEOUT; +import static org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_POOL_SIZE; +import static org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SIZE; +import static org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SQL_LIMIT; +import static org.apache.inlong.audit.config.ConfigConstants.KEY_QUEUE_PULL_TIMEOUT; +import static org.apache.inlong.audit.config.ConfigConstants.KEY_SOURCE_DB_SINK_BATCH; +import static org.apache.inlong.audit.config.ConfigConstants.KEY_SOURCE_DB_SINK_INTERVAL; +import static org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SIZE; +import static org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SQL_LIMIT; + +/** + * Jdbc sink + */ +public class JdbcSink implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(JdbcSink.class); + private final ScheduledExecutorService sinkTimer = Executors.newSingleThreadScheduledExecutor(); + private final DataQueue dataQueue; + private final int insertBatch; + private final int pullTimeOut; + private final SinkConfig sinkConfig; + private DataSource dataSource; + + public JdbcSink(DataQueue dataQueue, SinkConfig sinkConfig) { + this.dataQueue = dataQueue; + this.sinkConfig = sinkConfig; + + insertBatch = Configuration.getInstance().get(KEY_SOURCE_DB_SINK_BATCH, + DEFAULT_SOURCE_DB_SINK_BATCH); + + pullTimeOut = Configuration.getInstance().get(KEY_QUEUE_PULL_TIMEOUT, + DEFAULT_QUEUE_PULL_TIMEOUT); + } + + /** + * start + */ + public void start() { + createDataSource(); + + sinkTimer.scheduleWithFixedDelay(this::process, + 0, + Configuration.getInstance().get(KEY_SOURCE_DB_SINK_INTERVAL, + DEFAULT_SOURCE_DB_SINK_INTERVAL), + TimeUnit.MILLISECONDS); + } + + /** + * Process + */ + private void process() { + try (Connection connection = dataSource.getConnection(); + PreparedStatement preparedStatement = connection.prepareStatement(sinkConfig.getInsertSql())) { + if (connection.isClosed()) { + createDataSource(); + } + int counter = 0; + StatData data = dataQueue.pull(pullTimeOut, TimeUnit.MICROSECONDS); + while (data != null) { + preparedStatement.setString(1, data.getLogTs()); + preparedStatement.setString(2, data.getInlongGroupId()); + preparedStatement.setString(3, data.getInlongStreamId()); + preparedStatement.setString(4, data.getAuditId()); + preparedStatement.setString(5, data.getAuditTag()); + preparedStatement.setLong(6, data.getCount()); + preparedStatement.setLong(7, data.getSize()); + preparedStatement.setLong(8, data.getDelay()); + preparedStatement.addBatch(); + + if (++counter >= insertBatch) { + preparedStatement.executeBatch(); + preparedStatement.clearBatch(); + counter = 0; + } + data = dataQueue.pull(pullTimeOut, TimeUnit.MICROSECONDS); + } + if (counter > 0) { + preparedStatement.executeBatch(); + preparedStatement.clearBatch(); + } + } catch (Exception e) { + LOG.error("Process exception! {}", e.getMessage()); + } + } + + /** + * Create data source + */ + protected void createDataSource() { + HikariConfig config = new HikariConfig(); + config.setDriverClassName(sinkConfig.getDriverClassName()); + config.setJdbcUrl(sinkConfig.getJdbcUrl()); + config.setUsername(sinkConfig.getUsername()); + config.setPassword(sinkConfig.getPassword()); + config.setConnectionTimeout(Configuration.getInstance().get(KEY_DATASOURCE_CONNECTION_TIMEOUT, + DEFAULT_CONNECTION_TIMEOUT)); + config.addDataSourceProperty(CACHE_PREP_STMTS, + Configuration.getInstance().get(KEY_CACHE_PREP_STMTS, DEFAULT_CACHE_PREP_STMTS)); + config.addDataSourceProperty(PREP_STMT_CACHE_SIZE, + Configuration.getInstance().get(KEY_PREP_STMT_CACHE_SIZE, DEFAULT_PREP_STMT_CACHE_SIZE)); + config.addDataSourceProperty(PREP_STMT_CACHE_SQL_LIMIT, + Configuration.getInstance().get(KEY_PREP_STMT_CACHE_SQL_LIMIT, DEFAULT_PREP_STMT_CACHE_SQL_LIMIT)); + config.setMaximumPoolSize( + Configuration.getInstance().get(KEY_DATASOURCE_POOL_SIZE, + DEFAULT_DATASOURCE_POOL_SIZE)); + dataSource = new HikariDataSource(config); + } + + public void destroy() { + sinkTimer.shutdown(); + } + + @Override + public void close() throws Exception { + + } +}