This is an automated email from the ASF dual-hosted git repository. wenweihuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 80ec84e9c7 [INLONG-9957][Audit] Audit-service add local cache for openapi (#9958) 80ec84e9c7 is described below commit 80ec84e9c7c6230bae6ff33ebeba0da26fa7bdb3 Author: doleyzi <43397300+dole...@users.noreply.github.com> AuthorDate: Wed Apr 10 19:56:52 2024 +0800 [INLONG-9957][Audit] Audit-service add local cache for openapi (#9958) * Audit-service add local cache for openapi to improve the concurrency of service * Audit-service add local cache for openapi to improve the concurrency of service --- .../apache/inlong/audit/cache/AbstractCache.java | 102 ++++++++++++++ .../{sink/JdbcSink.java => cache/DayCache.java} | 156 +++++++++++---------- .../apache/inlong/audit/cache/HalfHourCache.java | 44 ++++++ .../org/apache/inlong/audit/cache/HourCache.java | 42 ++++++ .../apache/inlong/audit/cache/TenMinutesCache.java | 43 ++++++ .../inlong/audit/config/ConfigConstants.java | 8 ++ .../apache/inlong/audit/config/SqlConstants.java | 5 + .../apache/inlong/audit/service/EtlService.java | 77 +++++++++- .../org/apache/inlong/audit/sink/CacheSink.java | 90 ++++++++++++ .../org/apache/inlong/audit/sink/JdbcSink.java | 4 +- .../org/apache/inlong/audit/source/JdbcSource.java | 2 +- .../org/apache/inlong/audit/utils/CacheUtils.java | 35 +++++ inlong-audit/pom.xml | 5 + 13 files changed, 531 insertions(+), 82 deletions(-) diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AbstractCache.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AbstractCache.java new file mode 100644 index 0000000000..a492a9269b --- /dev/null +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AbstractCache.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.audit.cache; + +import org.apache.inlong.audit.config.Configuration; +import org.apache.inlong.audit.entities.AuditCycle; +import org.apache.inlong.audit.entities.StatData; +import org.apache.inlong.audit.source.JdbcSource; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_API_CACHE_EXPIRED_HOURS; +import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_API_CACHE_MAX_SIZE; +import static org.apache.inlong.audit.config.ConfigConstants.KEY_API_CACHE_EXPIRED_HOURS; +import static org.apache.inlong.audit.config.ConfigConstants.KEY_API_CACHE_MAX_SIZE; + +/** + * Abstract cache. + */ +public class AbstractCache { + + private static final Logger LOG = LoggerFactory.getLogger(JdbcSource.class); + protected final Cache<String, StatData> cache; + protected final ScheduledExecutorService monitorTimer = Executors.newSingleThreadScheduledExecutor(); + protected AuditCycle auditCycle; + private static final int DEFAULT_MONITOR_INTERVAL = 1; + + protected AbstractCache(AuditCycle auditCycle) { + cache = Caffeine.newBuilder() + .maximumSize(Configuration.getInstance().get(KEY_API_CACHE_MAX_SIZE, + DEFAULT_API_CACHE_MAX_SIZE)) + .expireAfterWrite(Configuration.getInstance().get(KEY_API_CACHE_EXPIRED_HOURS, + DEFAULT_API_CACHE_EXPIRED_HOURS), TimeUnit.HOURS) + .build(); + this.auditCycle = auditCycle; + monitorTimer.scheduleWithFixedDelay(new Runnable() { + + @Override + public void run() { + monitor(); + } + }, 0, DEFAULT_MONITOR_INTERVAL, TimeUnit.MINUTES); + } + + /** + * Get cache + * + * @return + */ + public Cache<String, StatData> getCache() { + return cache; + } + + /** + * Get data + * + * @param key + * @return + */ + public List<StatData> getData(String key) { + return Arrays.asList(cache.getIfPresent(key)); + } + + /** + * Destroy + */ + public void destroy() { + cache.cleanUp(); + monitorTimer.shutdown(); + } + + /** + * Monitor + */ + private void monitor() { + LOG.info("{} api local cache size={}", auditCycle, cache.estimatedSize()); + } +} diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/DayCache.java similarity index 51% copy from inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java copy to inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/DayCache.java index 99a6709826..cd8ceda479 100644 --- a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/DayCache.java @@ -15,11 +15,9 @@ * limitations under the License. */ -package org.apache.inlong.audit.sink; +package org.apache.inlong.audit.cache; -import org.apache.inlong.audit.channel.DataQueue; import org.apache.inlong.audit.config.Configuration; -import org.apache.inlong.audit.entities.SinkConfig; import org.apache.inlong.audit.entities.StatData; import com.zaxxer.hikari.HikariConfig; @@ -31,9 +29,11 @@ import javax.sql.DataSource; import java.sql.Connection; import java.sql.PreparedStatement; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; import static org.apache.inlong.audit.config.ConfigConstants.CACHE_PREP_STMTS; import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CACHE_PREP_STMTS; @@ -41,104 +41,116 @@ import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CONNECTION_ import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_POOL_SIZE; import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SIZE; import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SQL_LIMIT; -import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_QUEUE_PULL_TIMEOUT; -import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SOURCE_DB_SINK_BATCH; -import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SOURCE_DB_SINK_INTERVAL; import static org.apache.inlong.audit.config.ConfigConstants.KEY_CACHE_PREP_STMTS; import static org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_CONNECTION_TIMEOUT; import static org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_POOL_SIZE; +import static org.apache.inlong.audit.config.ConfigConstants.KEY_DEFAULT_MYSQL_DRIVER; +import static org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_DRIVER; +import static org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_JDBC_URL; +import static org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_PASSWORD; +import static org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_USERNAME; import static org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SIZE; import static org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SQL_LIMIT; -import static org.apache.inlong.audit.config.ConfigConstants.KEY_QUEUE_PULL_TIMEOUT; -import static org.apache.inlong.audit.config.ConfigConstants.KEY_SOURCE_DB_SINK_BATCH; -import static org.apache.inlong.audit.config.ConfigConstants.KEY_SOURCE_DB_SINK_INTERVAL; import static org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SIZE; import static org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SQL_LIMIT; +import static org.apache.inlong.audit.config.SqlConstants.DEFAULT_MYSQL_SOURCE_QUERY_DAY_SQL; +import static org.apache.inlong.audit.config.SqlConstants.KEY_MYSQL_SOURCE_QUERY_DAY_SQL; /** - * Jdbc sink + * Cache Of day ,for day openapi */ -public class JdbcSink implements AutoCloseable { - - private static final Logger LOG = LoggerFactory.getLogger(JdbcSink.class); - private final ScheduledExecutorService sinkTimer = Executors.newSingleThreadScheduledExecutor(); - private final DataQueue dataQueue; - private final int insertBatch; - private final int pullTimeOut; - private final SinkConfig sinkConfig; - private DataSource dataSource; +public class DayCache implements AutoCloseable { - public JdbcSink(DataQueue dataQueue, SinkConfig sinkConfig) { - this.dataQueue = dataQueue; - this.sinkConfig = sinkConfig; + private static final Logger LOG = LoggerFactory.getLogger(DayCache.class); + private static volatile DayCache dayCache = null; + private DataSource dataSource; - insertBatch = Configuration.getInstance().get(KEY_SOURCE_DB_SINK_BATCH, - DEFAULT_SOURCE_DB_SINK_BATCH); + private final String querySql; - pullTimeOut = Configuration.getInstance().get(KEY_QUEUE_PULL_TIMEOUT, - DEFAULT_QUEUE_PULL_TIMEOUT); + private DayCache() { + createDataSource(); + querySql = Configuration.getInstance().get(KEY_MYSQL_SOURCE_QUERY_DAY_SQL, + DEFAULT_MYSQL_SOURCE_QUERY_DAY_SQL); } /** - * start + * Get instance + * @return */ - public void start() { - createDataSource(); - - sinkTimer.scheduleWithFixedDelay(this::process, - 0, - Configuration.getInstance().get(KEY_SOURCE_DB_SINK_INTERVAL, - DEFAULT_SOURCE_DB_SINK_INTERVAL), - TimeUnit.MILLISECONDS); + public static DayCache getInstance() { + if (dayCache == null) { + synchronized (Configuration.class) { + if (dayCache == null) { + dayCache = new DayCache(); + } + } + } + return dayCache; } /** - * Process + * Get data + * @param startTime + * @param endTime + * @param inlongGroupId + * @param inlongStreamId + * @param auditId + * @param auditTag + * @return */ - private void process() { + public List<StatData> getData(String startTime, String endTime, String inlongGroupId, + String inlongStreamId, String auditId, String auditTag) { + List<StatData> result = new LinkedList<>(); try (Connection connection = dataSource.getConnection(); - PreparedStatement preparedStatement = connection.prepareStatement(sinkConfig.getInsertSql())) { + PreparedStatement pstat = connection.prepareStatement(querySql)) { if (connection.isClosed()) { createDataSource(); } - int counter = 0; - StatData data = dataQueue.pull(pullTimeOut, TimeUnit.MICROSECONDS); - while (data != null) { - preparedStatement.setString(1, data.getLogTs()); - preparedStatement.setString(2, data.getInlongGroupId()); - preparedStatement.setString(3, data.getInlongStreamId()); - preparedStatement.setString(4, data.getAuditId()); - preparedStatement.setString(5, data.getAuditTag()); - preparedStatement.setLong(6, data.getCount()); - preparedStatement.setLong(7, data.getSize()); - preparedStatement.setLong(8, data.getDelay()); - preparedStatement.addBatch(); - - if (++counter >= insertBatch) { - preparedStatement.executeBatch(); - preparedStatement.clearBatch(); - counter = 0; + pstat.setString(1, startTime); + pstat.setString(2, endTime); + pstat.setString(3, inlongGroupId); + pstat.setString(4, inlongStreamId); + pstat.setString(5, auditId); + try (ResultSet resultSet = pstat.executeQuery()) { + while (resultSet.next()) { + StatData data = new StatData(); + data.setLogTs(startTime); + data.setInlongGroupId(resultSet.getString(1)); + data.setInlongStreamId(resultSet.getString(2)); + data.setAuditId(resultSet.getString(3)); + data.setAuditTag(resultSet.getString(4)); + data.setCount(resultSet.getLong(5)); + data.setSize(resultSet.getLong(6)); + data.setDelay(resultSet.getLong(7)); + result.add(data); } - data = dataQueue.pull(pullTimeOut, TimeUnit.MICROSECONDS); - } - if (counter > 0) { - preparedStatement.executeBatch(); - preparedStatement.clearBatch(); + } catch (SQLException sqlException) { + LOG.error("Query has SQL exception! ", sqlException); } - } catch (Exception e) { - LOG.error("Process exception! {}", e.getMessage()); + } catch (Exception exception) { + LOG.error("Query has exception! ", exception); } + return result; } /** * Create data source */ - protected void createDataSource() { + private void createDataSource() { + String driver = Configuration.getInstance().get(KEY_MYSQL_DRIVER, KEY_DEFAULT_MYSQL_DRIVER); + String jdbcUrl = Configuration.getInstance().get(KEY_MYSQL_JDBC_URL); + String userName = Configuration.getInstance().get(KEY_MYSQL_USERNAME); + String passWord = Configuration.getInstance().get(KEY_MYSQL_PASSWORD); + assert (Objects.nonNull(driver) + && Objects.nonNull(jdbcUrl) + && Objects.nonNull(userName) + && Objects.nonNull(passWord)); + HikariConfig config = new HikariConfig(); - config.setDriverClassName(sinkConfig.getDriverClassName()); - config.setJdbcUrl(sinkConfig.getJdbcUrl()); - config.setUsername(sinkConfig.getUsername()); - config.setPassword(sinkConfig.getPassword()); + config.setDriverClassName(driver); + config.setJdbcUrl(jdbcUrl); + config.setUsername(userName); + config.setPassword(passWord); config.setConnectionTimeout(Configuration.getInstance().get(KEY_DATASOURCE_CONNECTION_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT)); config.addDataSourceProperty(CACHE_PREP_STMTS, @@ -153,10 +165,6 @@ public class JdbcSink implements AutoCloseable { dataSource = new HikariDataSource(config); } - public void destroy() { - sinkTimer.shutdown(); - } - @Override public void close() throws Exception { diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/HalfHourCache.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/HalfHourCache.java new file mode 100644 index 0000000000..96138f253c --- /dev/null +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/HalfHourCache.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.audit.cache; + +import org.apache.inlong.audit.config.Configuration; +import org.apache.inlong.audit.entities.AuditCycle; + +/** + * Cache Of minute 30 ,for minute 30 openapi + */ +public class HalfHourCache extends AbstractCache { + + private static volatile HalfHourCache halfHourCache = null; + + private HalfHourCache() { + super(AuditCycle.MINUTE_30); + } + + public static HalfHourCache getInstance() { + if (halfHourCache == null) { + synchronized (Configuration.class) { + if (halfHourCache == null) { + halfHourCache = new HalfHourCache(); + } + } + } + return halfHourCache; + } +} diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/HourCache.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/HourCache.java new file mode 100644 index 0000000000..aea4586bf6 --- /dev/null +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/HourCache.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.audit.cache; + +import org.apache.inlong.audit.config.Configuration; +import org.apache.inlong.audit.entities.AuditCycle; + +/** + * Cache Of hour ,for hour openapi + */ +public class HourCache extends AbstractCache { + + private static volatile HourCache hourCache = null; + private HourCache() { + super(AuditCycle.HOUR); + } + public static HourCache getInstance() { + if (hourCache == null) { + synchronized (Configuration.class) { + if (hourCache == null) { + hourCache = new HourCache(); + } + } + } + return hourCache; + } +} diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/TenMinutesCache.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/TenMinutesCache.java new file mode 100644 index 0000000000..00ecb2df54 --- /dev/null +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/TenMinutesCache.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.audit.cache; + +import org.apache.inlong.audit.config.Configuration; +import org.apache.inlong.audit.entities.AuditCycle; + +/** + * Cache Of minute 10 ,for minute 10 openapi + */ +public class TenMinutesCache extends AbstractCache { + + private static volatile TenMinutesCache tenMinutesCache = null; + + private TenMinutesCache() { + super(AuditCycle.MINUTE_10); + } + public static TenMinutesCache getInstance() { + if (tenMinutesCache == null) { + synchronized (Configuration.class) { + if (tenMinutesCache == null) { + tenMinutesCache = new TenMinutesCache(); + } + } + } + return tenMinutesCache; + } +} diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java index ecf64338e7..64797df982 100644 --- a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java @@ -123,4 +123,12 @@ public class ConfigConstants { public static final int MAX_INIT_COUNT = 2; public static final int RANDOM_BOUND = 10; + + // Cache config + public static final String KEY_API_CACHE_MAX_SIZE = "api.cache.max.size"; + public static final int DEFAULT_API_CACHE_MAX_SIZE = 50000000; + + public static final String KEY_API_CACHE_EXPIRED_HOURS = "api.cache.expired.hours"; + public static final int DEFAULT_API_CACHE_EXPIRED_HOURS = 12; + } diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java index 1eee2f7ec9..31d73aad81 100644 --- a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java @@ -77,6 +77,11 @@ public class SqlConstants { "AND audit_id = ? \n" + "GROUP BY inlong_group_id, inlong_stream_id, audit_id, audit_tag"; + public static final String KEY_MYSQL_SOURCE_QUERY_DAY_SQL = "mysql.query.day.sql"; + public static final String DEFAULT_MYSQL_SOURCE_QUERY_DAY_SQL = + "select log_ts,inlong_group_id,inlong_stream_id,audit_id,audit_tag,count,size,delay " + + "from audit_data_day where log_ts between ? and ? and inlong_group_id=? and inlong_stream_id=? and audit_id =? "; + // Mysql insert sql public static final String KEY_MYSQL_SINK_INSERT_DAY_SQL = "mysql.sink.insert.day.sql"; public static final String DEFAULT_MYSQL_SINK_INSERT_DAY_SQL = diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/EtlService.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/EtlService.java index dada6cb7f2..0574ac3771 100644 --- a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/EtlService.java +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/EtlService.java @@ -17,14 +17,21 @@ package org.apache.inlong.audit.service; +import org.apache.inlong.audit.cache.HalfHourCache; +import org.apache.inlong.audit.cache.HourCache; +import org.apache.inlong.audit.cache.TenMinutesCache; import org.apache.inlong.audit.channel.DataQueue; import org.apache.inlong.audit.config.Configuration; import org.apache.inlong.audit.entities.AuditCycle; import org.apache.inlong.audit.entities.SinkConfig; import org.apache.inlong.audit.entities.SourceConfig; +import org.apache.inlong.audit.sink.CacheSink; import org.apache.inlong.audit.sink.JdbcSink; import org.apache.inlong.audit.source.JdbcSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.Objects; import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CLICKHOUSE_DRIVER; @@ -57,15 +64,25 @@ import static org.apache.inlong.audit.config.SqlConstants.KEY_MYSQL_SOURCE_QUERY */ public class EtlService { + private static final Logger LOG = LoggerFactory.getLogger(JdbcSource.class); private JdbcSource mysqlSourceOfTemp; + private JdbcSource mysqlSourceOfTenMinutesCache; + private JdbcSource mysqlSourceOfHalfHourCache; + private JdbcSource mysqlSourceOfHourCache; private JdbcSink mysqlSinkOfDay; private JdbcSource clickhouseSource; private JdbcSink mysqlSinkOfTemp; + private CacheSink cacheSinkOfTenMinutesCache; + private CacheSink cacheSinkOfHalfHourCache; + private CacheSink cacheSinkOfHourCache; private final int queueSize; + private final int statBackTimes; public EtlService() { queueSize = Configuration.getInstance().get(KEY_DATA_QUEUE_SIZE, DEFAULT_DATA_QUEUE_SIZE); + statBackTimes = Configuration.getInstance().get(KEY_REALTIME_SUMMARY_STAT_BACK_TIMES, + DEFAULT_REALTIME_SUMMARY_STAT_BACK_TIMES); } /** @@ -74,6 +91,9 @@ public class EtlService { public void start() { clickhouseToMysql(); mysqlToMysqlOfDay(); + mysqlToTenMinutesCache(); + mysqlToHalfHourCache(); + mysqlToHourCache(); } /** @@ -83,7 +103,9 @@ public class EtlService { private void mysqlToMysqlOfDay() { DataQueue dataQueue = new DataQueue(queueSize); - mysqlSourceOfTemp = new JdbcSource(dataQueue, buildMysqlSourceConfig()); + mysqlSourceOfTemp = new JdbcSource(dataQueue, buildMysqlSourceConfig(AuditCycle.DAY, + Configuration.getInstance().get(KEY_DAILY_SUMMARY_STAT_BACK_TIMES, + DEFAULT_DAILY_SUMMARY_STAT_BACK_TIMES))); mysqlSourceOfTemp.start(); SinkConfig sinkConfig = buildMysqlSinkConfig(Configuration.getInstance().get(KEY_MYSQL_SINK_INSERT_DAY_SQL, @@ -92,6 +114,44 @@ public class EtlService { mysqlSinkOfDay.start(); } + /** + * Aggregate data from mysql data source and store in local cache for openapi. + */ + private void mysqlToTenMinutesCache() { + DataQueue dataQueue = new DataQueue(queueSize); + mysqlSourceOfTenMinutesCache = + new JdbcSource(dataQueue, buildMysqlSourceConfig(AuditCycle.MINUTE_10, statBackTimes)); + mysqlSourceOfTenMinutesCache.start(); + + cacheSinkOfTenMinutesCache = new CacheSink(dataQueue, TenMinutesCache.getInstance().getCache()); + cacheSinkOfTenMinutesCache.start(); + } + + /** + * Aggregate data from mysql data source and store in local cache for openapi. + */ + private void mysqlToHalfHourCache() { + DataQueue dataQueue = new DataQueue(queueSize); + mysqlSourceOfHalfHourCache = + new JdbcSource(dataQueue, buildMysqlSourceConfig(AuditCycle.MINUTE_30, statBackTimes)); + mysqlSourceOfHalfHourCache.start(); + + cacheSinkOfHalfHourCache = new CacheSink(dataQueue, HalfHourCache.getInstance().getCache()); + cacheSinkOfHalfHourCache.start(); + } + + /** + * Aggregate data from mysql data source and store in local cache for openapi. + */ + private void mysqlToHourCache() { + DataQueue dataQueue = new DataQueue(queueSize); + mysqlSourceOfHourCache = new JdbcSource(dataQueue, buildMysqlSourceConfig(AuditCycle.HOUR, statBackTimes)); + mysqlSourceOfHourCache.start(); + + cacheSinkOfHourCache = new CacheSink(dataQueue, HourCache.getInstance().getCache()); + cacheSinkOfHourCache.start(); + } + /** * Aggregate data from clickhouse data source and store the aggregated data in the target mysql table. * The default audit data cycle is 5 minutes,and stored in a temporary table. @@ -137,7 +197,7 @@ public class EtlService { * * @return */ - private SourceConfig buildMysqlSourceConfig() { + private SourceConfig buildMysqlSourceConfig(AuditCycle auditCycle, int statBackTimes) { String driver = Configuration.getInstance().get(KEY_MYSQL_DRIVER, KEY_DEFAULT_MYSQL_DRIVER); String jdbcUrl = Configuration.getInstance().get(KEY_MYSQL_JDBC_URL); String userName = Configuration.getInstance().get(KEY_MYSQL_USERNAME); @@ -146,11 +206,10 @@ public class EtlService { && Objects.nonNull(jdbcUrl) && Objects.nonNull(userName)); - return new SourceConfig(AuditCycle.DAY, + return new SourceConfig(auditCycle, Configuration.getInstance().get(KEY_MYSQL_SOURCE_QUERY_TEMP_SQL, DEFAULT_MYSQL_SOURCE_QUERY_TEMP_SQL), - Configuration.getInstance().get(KEY_DAILY_SUMMARY_STAT_BACK_TIMES, - DEFAULT_DAILY_SUMMARY_STAT_BACK_TIMES), + statBackTimes, driver, jdbcUrl, userName, @@ -192,5 +251,13 @@ public class EtlService { clickhouseSource.destroy(); mysqlSinkOfTemp.destroy(); + + mysqlSourceOfTenMinutesCache.destroy(); + mysqlSourceOfHalfHourCache.destroy(); + mysqlSourceOfHourCache.destroy(); + + cacheSinkOfTenMinutesCache.destroy(); + cacheSinkOfHalfHourCache.destroy(); + cacheSinkOfHourCache.destroy(); } } diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/CacheSink.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/CacheSink.java new file mode 100644 index 0000000000..8077817b1d --- /dev/null +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/CacheSink.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.audit.sink; + +import org.apache.inlong.audit.channel.DataQueue; +import org.apache.inlong.audit.config.Configuration; +import org.apache.inlong.audit.entities.StatData; +import org.apache.inlong.audit.utils.CacheUtils; + +import com.github.benmanes.caffeine.cache.Cache; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_QUEUE_PULL_TIMEOUT; +import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SOURCE_DB_SINK_INTERVAL; +import static org.apache.inlong.audit.config.ConfigConstants.KEY_QUEUE_PULL_TIMEOUT; +import static org.apache.inlong.audit.config.ConfigConstants.KEY_SOURCE_DB_SINK_INTERVAL; + +/** + * Cache sink + */ +public class CacheSink { + + private static final Logger LOG = LoggerFactory.getLogger(CacheSink.class); + private final ScheduledExecutorService sinkTimer = Executors.newSingleThreadScheduledExecutor(); + private final DataQueue dataQueue; + private final Cache<String, StatData> cache; + private final int pullTimeOut; + + public CacheSink(DataQueue dataQueue, Cache<String, StatData> cache) { + this.dataQueue = dataQueue; + this.cache = cache; + pullTimeOut = Configuration.getInstance().get(KEY_QUEUE_PULL_TIMEOUT, + DEFAULT_QUEUE_PULL_TIMEOUT); + } + + /** + * start + */ + public void start() { + sinkTimer.scheduleWithFixedDelay(this::process, + 0, + Configuration.getInstance().get(KEY_SOURCE_DB_SINK_INTERVAL, + DEFAULT_SOURCE_DB_SINK_INTERVAL), + TimeUnit.MILLISECONDS); + } + + /** + * Process + */ + private void process() { + try { + StatData data = dataQueue.pull(pullTimeOut, TimeUnit.MILLISECONDS); + while (data != null) { + String cacheKey = CacheUtils.buildCacheKey(data.getLogTs(), data.getInlongGroupId(), + data.getInlongStreamId(), data.getAuditId(), data.getAuditTag()); + cache.put(cacheKey, data); + data = dataQueue.pull(pullTimeOut, TimeUnit.MILLISECONDS); + } + } catch (Exception exception) { + LOG.error("Process exception! ", exception); + } + } + + /** + * Destroy + */ + public void destroy() { + sinkTimer.shutdown(); + } +} diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java index 99a6709826..451cbdc50a 100644 --- a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java @@ -102,7 +102,7 @@ public class JdbcSink implements AutoCloseable { createDataSource(); } int counter = 0; - StatData data = dataQueue.pull(pullTimeOut, TimeUnit.MICROSECONDS); + StatData data = dataQueue.pull(pullTimeOut, TimeUnit.MILLISECONDS); while (data != null) { preparedStatement.setString(1, data.getLogTs()); preparedStatement.setString(2, data.getInlongGroupId()); @@ -119,7 +119,7 @@ public class JdbcSink implements AutoCloseable { preparedStatement.clearBatch(); counter = 0; } - data = dataQueue.pull(pullTimeOut, TimeUnit.MICROSECONDS); + data = dataQueue.pull(pullTimeOut, TimeUnit.MILLISECONDS); } if (counter > 0) { preparedStatement.executeBatch(); diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java index accd22ebbc..e4e8e1dd0d 100644 --- a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java @@ -110,7 +110,7 @@ public class JdbcSource { ScheduledExecutorService timer = statTimers.computeIfAbsent(statBackTime, k -> Executors.newSingleThreadScheduledExecutor()); timer.scheduleWithFixedDelay(new StatServer(statBackTime), - statBackTime, + 0, statInterval, TimeUnit.MINUTES); } } diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/utils/CacheUtils.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/utils/CacheUtils.java new file mode 100644 index 0000000000..4c9e8ff72a --- /dev/null +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/utils/CacheUtils.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.audit.utils; + +/** + * Cache utils + */ +public class CacheUtils { + + public static String buildCacheKey(String logTs, String inlongGroupId, String inlongStreamId, + String auditId, String auditTag) { + return new StringBuilder() + .append(logTs) + .append(inlongGroupId) + .append(inlongStreamId) + .append(auditId) + .append(auditTag) + .toString(); + } +} diff --git a/inlong-audit/pom.xml b/inlong-audit/pom.xml index 54ebd8fcc2..fa98312e8b 100644 --- a/inlong-audit/pom.xml +++ b/inlong-audit/pom.xml @@ -90,6 +90,11 @@ <version>${HikariCP.version}</version> <scope>compile</scope> </dependency> + <dependency> + <groupId>com.github.ben-manes.caffeine</groupId> + <artifactId>caffeine</artifactId> + <version>${caffeine.version}</version> + </dependency> </dependencies> </project>