This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push: new 31c96a0f3 [INLONG-6076][Audit] Bugs in the ClickHouseService.processOutput() (#6077) 31c96a0f3 is described below commit 31c96a0f3217ec2c32aecebbc1803feea126139f Author: Goson Zhang <4675...@qq.com> AuthorDate: Fri Sep 30 19:51:14 2022 +0800 [INLONG-6076][Audit] Bugs in the ClickHouseService.processOutput() (#6077) --- .../inlong/audit/service/ClickHouseService.java | 45 +++++++++++----------- 1 file changed, 22 insertions(+), 23 deletions(-) diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java index 24042d5aa..6d98973a2 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java @@ -34,6 +34,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; /** * ClickHouseService @@ -54,7 +55,7 @@ public class ClickHouseService implements InsertData, AutoCloseable { private LinkedBlockingQueue<ClickHouseDataPo> batchQueue; private AtomicBoolean needBatchOutput = new AtomicBoolean(false); private AtomicInteger batchCounter = new AtomicInteger(0); - + private AtomicLong lastCheckTime = new AtomicLong(System.currentTimeMillis()); private Connection conn; /** @@ -77,17 +78,11 @@ public class ClickHouseService implements InsertData, AutoCloseable { Class.forName(chConfig.getDriver()); this.reconnect(); } catch (Exception e) { - LOG.error(e.getMessage(), e); + LOG.error("ClickHouseService start failure!", e); } - // timer - long currentTime = System.currentTimeMillis(); - // batch output interval - timerService.scheduleWithFixedDelay(() -> needBatchOutput.compareAndSet(false, true), - currentTime + chConfig.getBatchIntervalMs(), - chConfig.getBatchIntervalMs(), TimeUnit.MILLISECONDS); - // batch output process - timerService.scheduleWithFixedDelay(() -> processOutput(), - currentTime + chConfig.getProcessIntervalMs(), + // start timer + timerService.scheduleWithFixedDelay(this::processOutput, + chConfig.getProcessIntervalMs(), chConfig.getProcessIntervalMs(), TimeUnit.MILLISECONDS); } @@ -95,14 +90,15 @@ public class ClickHouseService implements InsertData, AutoCloseable { * processOutput */ private void processOutput() { - if (!this.needBatchOutput.get()) { + if (!this.needBatchOutput.get() + && (System.currentTimeMillis() - lastCheckTime.get() < chConfig.getBatchIntervalMs())) { return; } // output try (PreparedStatement pstat = this.conn.prepareStatement(INSERT_SQL)) { - // insert data - ClickHouseDataPo data = this.batchQueue.poll(); int counter = 0; + // output data to clickhouse + ClickHouseDataPo data = this.batchQueue.poll(); while (data != null) { pstat.setString(1, data.getIp()); pstat.setString(2, data.getDockerId()); @@ -124,20 +120,23 @@ public class ClickHouseService implements InsertData, AutoCloseable { this.conn.commit(); counter = 0; } + data = this.batchQueue.poll(); } - this.batchCounter.set(0); - pstat.executeBatch(); - this.conn.commit(); - } catch (Exception e) { - LOG.error(e.getMessage(), e); + if (counter > 0) { + pstat.executeBatch(); + this.conn.commit(); + } + } catch (Exception e1) { + LOG.error("Execute output to clickhouse failure!", e1); + // re-connect clickhouse try { this.reconnect(); - } catch (SQLException e1) { - LOG.error(e1.getMessage(), e1); + } catch (SQLException e2) { + LOG.error("Re-connect clickhouse failure!", e2); } } - - // recover + // recover flag + lastCheckTime.set(System.currentTimeMillis()); this.needBatchOutput.compareAndSet(true, false); }