This is an automated email from the ASF dual-hosted git repository.

wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 80ec84e9c7 [INLONG-9957][Audit] Audit-service add local cache for 
openapi (#9958)
80ec84e9c7 is described below

commit 80ec84e9c7c6230bae6ff33ebeba0da26fa7bdb3
Author: doleyzi <43397300+dole...@users.noreply.github.com>
AuthorDate: Wed Apr 10 19:56:52 2024 +0800

    [INLONG-9957][Audit] Audit-service add local cache for openapi (#9958)
    
    * Audit-service add local cache for openapi to improve the concurrency of 
service
    
    * Audit-service add local cache for openapi to improve the concurrency of 
service
---
 .../apache/inlong/audit/cache/AbstractCache.java   | 102 ++++++++++++++
 .../{sink/JdbcSink.java => cache/DayCache.java}    | 156 +++++++++++----------
 .../apache/inlong/audit/cache/HalfHourCache.java   |  44 ++++++
 .../org/apache/inlong/audit/cache/HourCache.java   |  42 ++++++
 .../apache/inlong/audit/cache/TenMinutesCache.java |  43 ++++++
 .../inlong/audit/config/ConfigConstants.java       |   8 ++
 .../apache/inlong/audit/config/SqlConstants.java   |   5 +
 .../apache/inlong/audit/service/EtlService.java    |  77 +++++++++-
 .../org/apache/inlong/audit/sink/CacheSink.java    |  90 ++++++++++++
 .../org/apache/inlong/audit/sink/JdbcSink.java     |   4 +-
 .../org/apache/inlong/audit/source/JdbcSource.java |   2 +-
 .../org/apache/inlong/audit/utils/CacheUtils.java  |  35 +++++
 inlong-audit/pom.xml                               |   5 +
 13 files changed, 531 insertions(+), 82 deletions(-)

diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AbstractCache.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AbstractCache.java
new file mode 100644
index 0000000000..a492a9269b
--- /dev/null
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AbstractCache.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.cache;
+
+import org.apache.inlong.audit.config.Configuration;
+import org.apache.inlong.audit.entities.AuditCycle;
+import org.apache.inlong.audit.entities.StatData;
+import org.apache.inlong.audit.source.JdbcSource;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_API_CACHE_EXPIRED_HOURS;
+import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_API_CACHE_MAX_SIZE;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_API_CACHE_EXPIRED_HOURS;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_API_CACHE_MAX_SIZE;
+
+/**
+ * Abstract cache.
+ */
+public class AbstractCache {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JdbcSource.class);
+    protected final Cache<String, StatData> cache;
+    protected final ScheduledExecutorService monitorTimer = 
Executors.newSingleThreadScheduledExecutor();
+    protected AuditCycle auditCycle;
+    private static final int DEFAULT_MONITOR_INTERVAL = 1;
+
+    protected AbstractCache(AuditCycle auditCycle) {
+        cache = Caffeine.newBuilder()
+                
.maximumSize(Configuration.getInstance().get(KEY_API_CACHE_MAX_SIZE,
+                        DEFAULT_API_CACHE_MAX_SIZE))
+                
.expireAfterWrite(Configuration.getInstance().get(KEY_API_CACHE_EXPIRED_HOURS,
+                        DEFAULT_API_CACHE_EXPIRED_HOURS), TimeUnit.HOURS)
+                .build();
+        this.auditCycle = auditCycle;
+        monitorTimer.scheduleWithFixedDelay(new Runnable() {
+
+            @Override
+            public void run() {
+                monitor();
+            }
+        }, 0, DEFAULT_MONITOR_INTERVAL, TimeUnit.MINUTES);
+    }
+
+    /**
+     * Get cache
+     *
+     * @return
+     */
+    public Cache<String, StatData> getCache() {
+        return cache;
+    }
+
+    /**
+     * Get data
+     *
+     * @param key
+     * @return
+     */
+    public List<StatData> getData(String key) {
+        return Arrays.asList(cache.getIfPresent(key));
+    }
+
+    /**
+     * Destroy
+     */
+    public void destroy() {
+        cache.cleanUp();
+        monitorTimer.shutdown();
+    }
+
+    /**
+     * Monitor
+     */
+    private void monitor() {
+        LOG.info("{} api local cache size={}", auditCycle, 
cache.estimatedSize());
+    }
+}
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/DayCache.java
similarity index 51%
copy from 
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java
copy to 
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/DayCache.java
index 99a6709826..cd8ceda479 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/DayCache.java
@@ -15,11 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit.sink;
+package org.apache.inlong.audit.cache;
 
-import org.apache.inlong.audit.channel.DataQueue;
 import org.apache.inlong.audit.config.Configuration;
-import org.apache.inlong.audit.entities.SinkConfig;
 import org.apache.inlong.audit.entities.StatData;
 
 import com.zaxxer.hikari.HikariConfig;
@@ -31,9 +29,11 @@ import javax.sql.DataSource;
 
 import java.sql.Connection;
 import java.sql.PreparedStatement;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
 
 import static org.apache.inlong.audit.config.ConfigConstants.CACHE_PREP_STMTS;
 import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CACHE_PREP_STMTS;
@@ -41,104 +41,116 @@ import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CONNECTION_
 import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_POOL_SIZE;
 import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SIZE;
 import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SQL_LIMIT;
-import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_QUEUE_PULL_TIMEOUT;
-import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SOURCE_DB_SINK_BATCH;
-import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SOURCE_DB_SINK_INTERVAL;
 import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_CACHE_PREP_STMTS;
 import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_CONNECTION_TIMEOUT;
 import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_POOL_SIZE;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_DEFAULT_MYSQL_DRIVER;
+import static org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_DRIVER;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_JDBC_URL;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_PASSWORD;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_USERNAME;
 import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SIZE;
 import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SQL_LIMIT;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_QUEUE_PULL_TIMEOUT;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_SOURCE_DB_SINK_BATCH;
-import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_SOURCE_DB_SINK_INTERVAL;
 import static 
org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SIZE;
 import static 
org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SQL_LIMIT;
+import static 
org.apache.inlong.audit.config.SqlConstants.DEFAULT_MYSQL_SOURCE_QUERY_DAY_SQL;
+import static 
org.apache.inlong.audit.config.SqlConstants.KEY_MYSQL_SOURCE_QUERY_DAY_SQL;
 
 /**
- * Jdbc sink
+ * Cache Of day ,for day openapi
  */
-public class JdbcSink implements AutoCloseable {
-
-    private static final Logger LOG = LoggerFactory.getLogger(JdbcSink.class);
-    private final ScheduledExecutorService sinkTimer = 
Executors.newSingleThreadScheduledExecutor();
-    private final DataQueue dataQueue;
-    private final int insertBatch;
-    private final int pullTimeOut;
-    private final SinkConfig sinkConfig;
-    private DataSource dataSource;
+public class DayCache implements AutoCloseable {
 
-    public JdbcSink(DataQueue dataQueue, SinkConfig sinkConfig) {
-        this.dataQueue = dataQueue;
-        this.sinkConfig = sinkConfig;
+    private static final Logger LOG = LoggerFactory.getLogger(DayCache.class);
+    private static volatile DayCache dayCache = null;
+    private DataSource dataSource;
 
-        insertBatch = Configuration.getInstance().get(KEY_SOURCE_DB_SINK_BATCH,
-                DEFAULT_SOURCE_DB_SINK_BATCH);
+    private final String querySql;
 
-        pullTimeOut = Configuration.getInstance().get(KEY_QUEUE_PULL_TIMEOUT,
-                DEFAULT_QUEUE_PULL_TIMEOUT);
+    private DayCache() {
+        createDataSource();
+        querySql = 
Configuration.getInstance().get(KEY_MYSQL_SOURCE_QUERY_DAY_SQL,
+                DEFAULT_MYSQL_SOURCE_QUERY_DAY_SQL);
     }
 
     /**
-     * start
+     * Get instance
+     * @return
      */
-    public void start() {
-        createDataSource();
-
-        sinkTimer.scheduleWithFixedDelay(this::process,
-                0,
-                Configuration.getInstance().get(KEY_SOURCE_DB_SINK_INTERVAL,
-                        DEFAULT_SOURCE_DB_SINK_INTERVAL),
-                TimeUnit.MILLISECONDS);
+    public static DayCache getInstance() {
+        if (dayCache == null) {
+            synchronized (Configuration.class) {
+                if (dayCache == null) {
+                    dayCache = new DayCache();
+                }
+            }
+        }
+        return dayCache;
     }
 
     /**
-     * Process
+     * Get data
+     * @param startTime
+     * @param endTime
+     * @param inlongGroupId
+     * @param inlongStreamId
+     * @param auditId
+     * @param auditTag
+     * @return
      */
-    private void process() {
+    public List<StatData> getData(String startTime, String endTime, String 
inlongGroupId,
+            String inlongStreamId, String auditId, String auditTag) {
+        List<StatData> result = new LinkedList<>();
         try (Connection connection = dataSource.getConnection();
-                PreparedStatement preparedStatement = 
connection.prepareStatement(sinkConfig.getInsertSql())) {
+                PreparedStatement pstat = 
connection.prepareStatement(querySql)) {
             if (connection.isClosed()) {
                 createDataSource();
             }
-            int counter = 0;
-            StatData data = dataQueue.pull(pullTimeOut, TimeUnit.MICROSECONDS);
-            while (data != null) {
-                preparedStatement.setString(1, data.getLogTs());
-                preparedStatement.setString(2, data.getInlongGroupId());
-                preparedStatement.setString(3, data.getInlongStreamId());
-                preparedStatement.setString(4, data.getAuditId());
-                preparedStatement.setString(5, data.getAuditTag());
-                preparedStatement.setLong(6, data.getCount());
-                preparedStatement.setLong(7, data.getSize());
-                preparedStatement.setLong(8, data.getDelay());
-                preparedStatement.addBatch();
-
-                if (++counter >= insertBatch) {
-                    preparedStatement.executeBatch();
-                    preparedStatement.clearBatch();
-                    counter = 0;
+            pstat.setString(1, startTime);
+            pstat.setString(2, endTime);
+            pstat.setString(3, inlongGroupId);
+            pstat.setString(4, inlongStreamId);
+            pstat.setString(5, auditId);
+            try (ResultSet resultSet = pstat.executeQuery()) {
+                while (resultSet.next()) {
+                    StatData data = new StatData();
+                    data.setLogTs(startTime);
+                    data.setInlongGroupId(resultSet.getString(1));
+                    data.setInlongStreamId(resultSet.getString(2));
+                    data.setAuditId(resultSet.getString(3));
+                    data.setAuditTag(resultSet.getString(4));
+                    data.setCount(resultSet.getLong(5));
+                    data.setSize(resultSet.getLong(6));
+                    data.setDelay(resultSet.getLong(7));
+                    result.add(data);
                 }
-                data = dataQueue.pull(pullTimeOut, TimeUnit.MICROSECONDS);
-            }
-            if (counter > 0) {
-                preparedStatement.executeBatch();
-                preparedStatement.clearBatch();
+            } catch (SQLException sqlException) {
+                LOG.error("Query has SQL exception! ", sqlException);
             }
-        } catch (Exception e) {
-            LOG.error("Process exception! {}", e.getMessage());
+        } catch (Exception exception) {
+            LOG.error("Query has exception! ", exception);
         }
+        return result;
     }
 
     /**
      * Create data source
      */
-    protected void createDataSource() {
+    private void createDataSource() {
+        String driver = Configuration.getInstance().get(KEY_MYSQL_DRIVER, 
KEY_DEFAULT_MYSQL_DRIVER);
+        String jdbcUrl = Configuration.getInstance().get(KEY_MYSQL_JDBC_URL);
+        String userName = Configuration.getInstance().get(KEY_MYSQL_USERNAME);
+        String passWord = Configuration.getInstance().get(KEY_MYSQL_PASSWORD);
+        assert (Objects.nonNull(driver)
+                && Objects.nonNull(jdbcUrl)
+                && Objects.nonNull(userName)
+                && Objects.nonNull(passWord));
+
         HikariConfig config = new HikariConfig();
-        config.setDriverClassName(sinkConfig.getDriverClassName());
-        config.setJdbcUrl(sinkConfig.getJdbcUrl());
-        config.setUsername(sinkConfig.getUsername());
-        config.setPassword(sinkConfig.getPassword());
+        config.setDriverClassName(driver);
+        config.setJdbcUrl(jdbcUrl);
+        config.setUsername(userName);
+        config.setPassword(passWord);
         
config.setConnectionTimeout(Configuration.getInstance().get(KEY_DATASOURCE_CONNECTION_TIMEOUT,
                 DEFAULT_CONNECTION_TIMEOUT));
         config.addDataSourceProperty(CACHE_PREP_STMTS,
@@ -153,10 +165,6 @@ public class JdbcSink implements AutoCloseable {
         dataSource = new HikariDataSource(config);
     }
 
-    public void destroy() {
-        sinkTimer.shutdown();
-    }
-
     @Override
     public void close() throws Exception {
 
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/HalfHourCache.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/HalfHourCache.java
new file mode 100644
index 0000000000..96138f253c
--- /dev/null
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/HalfHourCache.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.cache;
+
+import org.apache.inlong.audit.config.Configuration;
+import org.apache.inlong.audit.entities.AuditCycle;
+
+/**
+ * Cache Of minute 30 ,for minute 30 openapi
+ */
+public class HalfHourCache extends AbstractCache {
+
+    private static volatile HalfHourCache halfHourCache = null;
+
+    private HalfHourCache() {
+        super(AuditCycle.MINUTE_30);
+    }
+
+    public static HalfHourCache getInstance() {
+        if (halfHourCache == null) {
+            synchronized (Configuration.class) {
+                if (halfHourCache == null) {
+                    halfHourCache = new HalfHourCache();
+                }
+            }
+        }
+        return halfHourCache;
+    }
+}
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/HourCache.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/HourCache.java
new file mode 100644
index 0000000000..aea4586bf6
--- /dev/null
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/HourCache.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.cache;
+
+import org.apache.inlong.audit.config.Configuration;
+import org.apache.inlong.audit.entities.AuditCycle;
+
+/**
+ * Cache Of hour ,for hour openapi
+ */
+public class HourCache extends AbstractCache {
+
+    private static volatile HourCache hourCache = null;
+    private HourCache() {
+        super(AuditCycle.HOUR);
+    }
+    public static HourCache getInstance() {
+        if (hourCache == null) {
+            synchronized (Configuration.class) {
+                if (hourCache == null) {
+                    hourCache = new HourCache();
+                }
+            }
+        }
+        return hourCache;
+    }
+}
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/TenMinutesCache.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/TenMinutesCache.java
new file mode 100644
index 0000000000..00ecb2df54
--- /dev/null
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/TenMinutesCache.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.cache;
+
+import org.apache.inlong.audit.config.Configuration;
+import org.apache.inlong.audit.entities.AuditCycle;
+
+/**
+ * Cache Of minute 10 ,for minute 10 openapi
+ */
+public class TenMinutesCache extends AbstractCache {
+
+    private static volatile TenMinutesCache tenMinutesCache = null;
+
+    private TenMinutesCache() {
+        super(AuditCycle.MINUTE_10);
+    }
+    public static TenMinutesCache getInstance() {
+        if (tenMinutesCache == null) {
+            synchronized (Configuration.class) {
+                if (tenMinutesCache == null) {
+                    tenMinutesCache = new TenMinutesCache();
+                }
+            }
+        }
+        return tenMinutesCache;
+    }
+}
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java
index ecf64338e7..64797df982 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java
@@ -123,4 +123,12 @@ public class ConfigConstants {
 
     public static final int MAX_INIT_COUNT = 2;
     public static final int RANDOM_BOUND = 10;
+
+    // Cache config
+    public static final String KEY_API_CACHE_MAX_SIZE = "api.cache.max.size";
+    public static final int DEFAULT_API_CACHE_MAX_SIZE = 50000000;
+
+    public static final String KEY_API_CACHE_EXPIRED_HOURS = 
"api.cache.expired.hours";
+    public static final int DEFAULT_API_CACHE_EXPIRED_HOURS = 12;
+
 }
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
index 1eee2f7ec9..31d73aad81 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java
@@ -77,6 +77,11 @@ public class SqlConstants {
                     "AND audit_id = ? \n" +
                     "GROUP BY inlong_group_id, inlong_stream_id, audit_id, 
audit_tag";
 
+    public static final String KEY_MYSQL_SOURCE_QUERY_DAY_SQL = 
"mysql.query.day.sql";
+    public static final String DEFAULT_MYSQL_SOURCE_QUERY_DAY_SQL =
+            "select 
log_ts,inlong_group_id,inlong_stream_id,audit_id,audit_tag,count,size,delay " +
+                    "from audit_data_day where log_ts between ? and ? and 
inlong_group_id=? and inlong_stream_id=? and audit_id =? ";
+
     // Mysql insert sql
     public static final String KEY_MYSQL_SINK_INSERT_DAY_SQL = 
"mysql.sink.insert.day.sql";
     public static final String DEFAULT_MYSQL_SINK_INSERT_DAY_SQL =
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/EtlService.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/EtlService.java
index dada6cb7f2..0574ac3771 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/EtlService.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/EtlService.java
@@ -17,14 +17,21 @@
 
 package org.apache.inlong.audit.service;
 
+import org.apache.inlong.audit.cache.HalfHourCache;
+import org.apache.inlong.audit.cache.HourCache;
+import org.apache.inlong.audit.cache.TenMinutesCache;
 import org.apache.inlong.audit.channel.DataQueue;
 import org.apache.inlong.audit.config.Configuration;
 import org.apache.inlong.audit.entities.AuditCycle;
 import org.apache.inlong.audit.entities.SinkConfig;
 import org.apache.inlong.audit.entities.SourceConfig;
+import org.apache.inlong.audit.sink.CacheSink;
 import org.apache.inlong.audit.sink.JdbcSink;
 import org.apache.inlong.audit.source.JdbcSource;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Objects;
 
 import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CLICKHOUSE_DRIVER;
@@ -57,15 +64,25 @@ import static 
org.apache.inlong.audit.config.SqlConstants.KEY_MYSQL_SOURCE_QUERY
  */
 public class EtlService {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(JdbcSource.class);
     private JdbcSource mysqlSourceOfTemp;
+    private JdbcSource mysqlSourceOfTenMinutesCache;
+    private JdbcSource mysqlSourceOfHalfHourCache;
+    private JdbcSource mysqlSourceOfHourCache;
     private JdbcSink mysqlSinkOfDay;
     private JdbcSource clickhouseSource;
     private JdbcSink mysqlSinkOfTemp;
+    private CacheSink cacheSinkOfTenMinutesCache;
+    private CacheSink cacheSinkOfHalfHourCache;
+    private CacheSink cacheSinkOfHourCache;
     private final int queueSize;
+    private final int statBackTimes;
 
     public EtlService() {
         queueSize = Configuration.getInstance().get(KEY_DATA_QUEUE_SIZE,
                 DEFAULT_DATA_QUEUE_SIZE);
+        statBackTimes = 
Configuration.getInstance().get(KEY_REALTIME_SUMMARY_STAT_BACK_TIMES,
+                DEFAULT_REALTIME_SUMMARY_STAT_BACK_TIMES);
     }
 
     /**
@@ -74,6 +91,9 @@ public class EtlService {
     public void start() {
         clickhouseToMysql();
         mysqlToMysqlOfDay();
+        mysqlToTenMinutesCache();
+        mysqlToHalfHourCache();
+        mysqlToHourCache();
     }
 
     /**
@@ -83,7 +103,9 @@ public class EtlService {
     private void mysqlToMysqlOfDay() {
         DataQueue dataQueue = new DataQueue(queueSize);
 
-        mysqlSourceOfTemp = new JdbcSource(dataQueue, 
buildMysqlSourceConfig());
+        mysqlSourceOfTemp = new JdbcSource(dataQueue, 
buildMysqlSourceConfig(AuditCycle.DAY,
+                
Configuration.getInstance().get(KEY_DAILY_SUMMARY_STAT_BACK_TIMES,
+                        DEFAULT_DAILY_SUMMARY_STAT_BACK_TIMES)));
         mysqlSourceOfTemp.start();
 
         SinkConfig sinkConfig = 
buildMysqlSinkConfig(Configuration.getInstance().get(KEY_MYSQL_SINK_INSERT_DAY_SQL,
@@ -92,6 +114,44 @@ public class EtlService {
         mysqlSinkOfDay.start();
     }
 
+    /**
+     * Aggregate data from mysql data source and store in local cache for 
openapi.
+     */
+    private void mysqlToTenMinutesCache() {
+        DataQueue dataQueue = new DataQueue(queueSize);
+        mysqlSourceOfTenMinutesCache =
+                new JdbcSource(dataQueue, 
buildMysqlSourceConfig(AuditCycle.MINUTE_10, statBackTimes));
+        mysqlSourceOfTenMinutesCache.start();
+
+        cacheSinkOfTenMinutesCache = new CacheSink(dataQueue, 
TenMinutesCache.getInstance().getCache());
+        cacheSinkOfTenMinutesCache.start();
+    }
+
+    /**
+     * Aggregate data from mysql data source and store in local cache for 
openapi.
+     */
+    private void mysqlToHalfHourCache() {
+        DataQueue dataQueue = new DataQueue(queueSize);
+        mysqlSourceOfHalfHourCache =
+                new JdbcSource(dataQueue, 
buildMysqlSourceConfig(AuditCycle.MINUTE_30, statBackTimes));
+        mysqlSourceOfHalfHourCache.start();
+
+        cacheSinkOfHalfHourCache = new CacheSink(dataQueue, 
HalfHourCache.getInstance().getCache());
+        cacheSinkOfHalfHourCache.start();
+    }
+
+    /**
+     * Aggregate data from mysql data source and store in local cache for 
openapi.
+     */
+    private void mysqlToHourCache() {
+        DataQueue dataQueue = new DataQueue(queueSize);
+        mysqlSourceOfHourCache = new JdbcSource(dataQueue, 
buildMysqlSourceConfig(AuditCycle.HOUR, statBackTimes));
+        mysqlSourceOfHourCache.start();
+
+        cacheSinkOfHourCache = new CacheSink(dataQueue, 
HourCache.getInstance().getCache());
+        cacheSinkOfHourCache.start();
+    }
+
     /**
      * Aggregate data from clickhouse data source and store the aggregated 
data in the target mysql table.
      * The default audit data cycle is 5 minutes,and stored in a temporary 
table.
@@ -137,7 +197,7 @@ public class EtlService {
      *
      * @return
      */
-    private SourceConfig buildMysqlSourceConfig() {
+    private SourceConfig buildMysqlSourceConfig(AuditCycle auditCycle, int 
statBackTimes) {
         String driver = Configuration.getInstance().get(KEY_MYSQL_DRIVER, 
KEY_DEFAULT_MYSQL_DRIVER);
         String jdbcUrl = Configuration.getInstance().get(KEY_MYSQL_JDBC_URL);
         String userName = Configuration.getInstance().get(KEY_MYSQL_USERNAME);
@@ -146,11 +206,10 @@ public class EtlService {
                 && Objects.nonNull(jdbcUrl)
                 && Objects.nonNull(userName));
 
-        return new SourceConfig(AuditCycle.DAY,
+        return new SourceConfig(auditCycle,
                 
Configuration.getInstance().get(KEY_MYSQL_SOURCE_QUERY_TEMP_SQL,
                         DEFAULT_MYSQL_SOURCE_QUERY_TEMP_SQL),
-                
Configuration.getInstance().get(KEY_DAILY_SUMMARY_STAT_BACK_TIMES,
-                        DEFAULT_DAILY_SUMMARY_STAT_BACK_TIMES),
+                statBackTimes,
                 driver,
                 jdbcUrl,
                 userName,
@@ -192,5 +251,13 @@ public class EtlService {
 
         clickhouseSource.destroy();
         mysqlSinkOfTemp.destroy();
+
+        mysqlSourceOfTenMinutesCache.destroy();
+        mysqlSourceOfHalfHourCache.destroy();
+        mysqlSourceOfHourCache.destroy();
+
+        cacheSinkOfTenMinutesCache.destroy();
+        cacheSinkOfHalfHourCache.destroy();
+        cacheSinkOfHourCache.destroy();
     }
 }
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/CacheSink.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/CacheSink.java
new file mode 100644
index 0000000000..8077817b1d
--- /dev/null
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/CacheSink.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.sink;
+
+import org.apache.inlong.audit.channel.DataQueue;
+import org.apache.inlong.audit.config.Configuration;
+import org.apache.inlong.audit.entities.StatData;
+import org.apache.inlong.audit.utils.CacheUtils;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_QUEUE_PULL_TIMEOUT;
+import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SOURCE_DB_SINK_INTERVAL;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_QUEUE_PULL_TIMEOUT;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_SOURCE_DB_SINK_INTERVAL;
+
+/**
+ * Cache sink
+ */
+public class CacheSink {
+
+    private static final Logger LOG = LoggerFactory.getLogger(CacheSink.class);
+    private final ScheduledExecutorService sinkTimer = 
Executors.newSingleThreadScheduledExecutor();
+    private final DataQueue dataQueue;
+    private final Cache<String, StatData> cache;
+    private final int pullTimeOut;
+
+    public CacheSink(DataQueue dataQueue, Cache<String, StatData> cache) {
+        this.dataQueue = dataQueue;
+        this.cache = cache;
+        pullTimeOut = Configuration.getInstance().get(KEY_QUEUE_PULL_TIMEOUT,
+                DEFAULT_QUEUE_PULL_TIMEOUT);
+    }
+
+    /**
+     * start
+     */
+    public void start() {
+        sinkTimer.scheduleWithFixedDelay(this::process,
+                0,
+                Configuration.getInstance().get(KEY_SOURCE_DB_SINK_INTERVAL,
+                        DEFAULT_SOURCE_DB_SINK_INTERVAL),
+                TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Process
+     */
+    private void process() {
+        try {
+            StatData data = dataQueue.pull(pullTimeOut, TimeUnit.MILLISECONDS);
+            while (data != null) {
+                String cacheKey = CacheUtils.buildCacheKey(data.getLogTs(), 
data.getInlongGroupId(),
+                        data.getInlongStreamId(), data.getAuditId(), 
data.getAuditTag());
+                cache.put(cacheKey, data);
+                data = dataQueue.pull(pullTimeOut, TimeUnit.MILLISECONDS);
+            }
+        } catch (Exception exception) {
+            LOG.error("Process exception! ", exception);
+        }
+    }
+
+    /**
+     * Destroy
+     */
+    public void destroy() {
+        sinkTimer.shutdown();
+    }
+}
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java
index 99a6709826..451cbdc50a 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java
@@ -102,7 +102,7 @@ public class JdbcSink implements AutoCloseable {
                 createDataSource();
             }
             int counter = 0;
-            StatData data = dataQueue.pull(pullTimeOut, TimeUnit.MICROSECONDS);
+            StatData data = dataQueue.pull(pullTimeOut, TimeUnit.MILLISECONDS);
             while (data != null) {
                 preparedStatement.setString(1, data.getLogTs());
                 preparedStatement.setString(2, data.getInlongGroupId());
@@ -119,7 +119,7 @@ public class JdbcSink implements AutoCloseable {
                     preparedStatement.clearBatch();
                     counter = 0;
                 }
-                data = dataQueue.pull(pullTimeOut, TimeUnit.MICROSECONDS);
+                data = dataQueue.pull(pullTimeOut, TimeUnit.MILLISECONDS);
             }
             if (counter > 0) {
                 preparedStatement.executeBatch();
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java
index accd22ebbc..e4e8e1dd0d 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java
@@ -110,7 +110,7 @@ public class JdbcSource {
             ScheduledExecutorService timer =
                     statTimers.computeIfAbsent(statBackTime, k -> 
Executors.newSingleThreadScheduledExecutor());
             timer.scheduleWithFixedDelay(new StatServer(statBackTime),
-                    statBackTime,
+                    0,
                     statInterval, TimeUnit.MINUTES);
         }
     }
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/utils/CacheUtils.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/utils/CacheUtils.java
new file mode 100644
index 0000000000..4c9e8ff72a
--- /dev/null
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/utils/CacheUtils.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.utils;
+
+/**
+ * Cache utils
+ */
+public class CacheUtils {
+
+    public static String buildCacheKey(String logTs, String inlongGroupId, 
String inlongStreamId,
+            String auditId, String auditTag) {
+        return new StringBuilder()
+                .append(logTs)
+                .append(inlongGroupId)
+                .append(inlongStreamId)
+                .append(auditId)
+                .append(auditTag)
+                .toString();
+    }
+}
diff --git a/inlong-audit/pom.xml b/inlong-audit/pom.xml
index 54ebd8fcc2..fa98312e8b 100644
--- a/inlong-audit/pom.xml
+++ b/inlong-audit/pom.xml
@@ -90,6 +90,11 @@
             <version>${HikariCP.version}</version>
             <scope>compile</scope>
         </dependency>
+        <dependency>
+            <groupId>com.github.ben-manes.caffeine</groupId>
+            <artifactId>caffeine</artifactId>
+            <version>${caffeine.version}</version>
+        </dependency>
     </dependencies>
 
 </project>


Reply via email to