This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/release-1.3.0 by this push:
     new 31c96a0f3 [INLONG-6076][Audit] Bugs in the 
ClickHouseService.processOutput() (#6077)
31c96a0f3 is described below

commit 31c96a0f3217ec2c32aecebbc1803feea126139f
Author: Goson Zhang <4675...@qq.com>
AuthorDate: Fri Sep 30 19:51:14 2022 +0800

    [INLONG-6076][Audit] Bugs in the ClickHouseService.processOutput() (#6077)
---
 .../inlong/audit/service/ClickHouseService.java    | 45 +++++++++++-----------
 1 file changed, 22 insertions(+), 23 deletions(-)

diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java
index 24042d5aa..6d98973a2 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * ClickHouseService
@@ -54,7 +55,7 @@ public class ClickHouseService implements InsertData, 
AutoCloseable {
     private LinkedBlockingQueue<ClickHouseDataPo> batchQueue;
     private AtomicBoolean needBatchOutput = new AtomicBoolean(false);
     private AtomicInteger batchCounter = new AtomicInteger(0);
-
+    private AtomicLong lastCheckTime = new 
AtomicLong(System.currentTimeMillis());
     private Connection conn;
 
     /**
@@ -77,17 +78,11 @@ public class ClickHouseService implements InsertData, 
AutoCloseable {
             Class.forName(chConfig.getDriver());
             this.reconnect();
         } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
+            LOG.error("ClickHouseService start failure!", e);
         }
-        // timer
-        long currentTime = System.currentTimeMillis();
-        // batch output interval
-        timerService.scheduleWithFixedDelay(() -> 
needBatchOutput.compareAndSet(false, true),
-                currentTime + chConfig.getBatchIntervalMs(),
-                chConfig.getBatchIntervalMs(), TimeUnit.MILLISECONDS);
-        // batch output process
-        timerService.scheduleWithFixedDelay(() -> processOutput(),
-                currentTime + chConfig.getProcessIntervalMs(),
+        // start timer
+        timerService.scheduleWithFixedDelay(this::processOutput,
+                chConfig.getProcessIntervalMs(),
                 chConfig.getProcessIntervalMs(), TimeUnit.MILLISECONDS);
     }
 
@@ -95,14 +90,15 @@ public class ClickHouseService implements InsertData, 
AutoCloseable {
      * processOutput
      */
     private void processOutput() {
-        if (!this.needBatchOutput.get()) {
+        if (!this.needBatchOutput.get()
+                && (System.currentTimeMillis() - lastCheckTime.get() < 
chConfig.getBatchIntervalMs())) {
             return;
         }
         // output
         try (PreparedStatement pstat = this.conn.prepareStatement(INSERT_SQL)) 
{
-            // insert data
-            ClickHouseDataPo data = this.batchQueue.poll();
             int counter = 0;
+            // output data to clickhouse
+            ClickHouseDataPo data = this.batchQueue.poll();
             while (data != null) {
                 pstat.setString(1, data.getIp());
                 pstat.setString(2, data.getDockerId());
@@ -124,20 +120,23 @@ public class ClickHouseService implements InsertData, 
AutoCloseable {
                     this.conn.commit();
                     counter = 0;
                 }
+                data = this.batchQueue.poll();
             }
-            this.batchCounter.set(0);
-            pstat.executeBatch();
-            this.conn.commit();
-        } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
+            if (counter > 0) {
+                pstat.executeBatch();
+                this.conn.commit();
+            }
+        } catch (Exception e1) {
+            LOG.error("Execute output to clickhouse failure!", e1);
+            // re-connect clickhouse
             try {
                 this.reconnect();
-            } catch (SQLException e1) {
-                LOG.error(e1.getMessage(), e1);
+            } catch (SQLException e2) {
+                LOG.error("Re-connect clickhouse failure!", e2);
             }
         }
-
-        // recover
+        // recover flag
+        lastCheckTime.set(System.currentTimeMillis());
         this.needBatchOutput.compareAndSet(true, false);
     }
 

Reply via email to