This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 18b0bf686f [INLONG-9904][Audit] SDK support checkpoint feature for Flink job (#9905) 18b0bf686f is described below commit 18b0bf686fe4339f4c9824c3e0beae202ae73e45 Author: doleyzi <43397300+dole...@users.noreply.github.com> AuthorDate: Mon Apr 1 23:51:11 2024 +0800 [INLONG-9904][Audit] SDK support checkpoint feature for Flink job (#9905) --- .../inlong/agent/metrics/audit/AuditUtils.java | 2 +- .../org/apache/inlong/audit/AuditReporterImpl.java | 296 ++++++++++++++++----- .../apache/inlong/audit/send/SenderManager.java | 24 +- .../apache/inlong/audit/util/AuditDimensions.java | 99 +++++++ .../org/apache/inlong/audit/util/AuditValues.java | 58 ++++ inlong-audit/sql/apache_inlong_audit.sql | 1 + .../inlong/dataproxy/metrics/audit/AuditUtils.java | 2 +- .../sort/standalone/metrics/audit/AuditUtils.java | 2 +- .../inlong/sort/base/metric/SinkMetricData.java | 2 +- .../inlong/sort/base/metric/SourceMetricData.java | 2 +- .../server/broker/stats/audit/AuditUtils.java | 2 +- 11 files changed, 402 insertions(+), 88 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java index fc6f4d90c3..e4cc7691c8 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java @@ -107,6 +107,6 @@ public class AuditUtils { if (!IS_AUDIT) { return; } - AuditOperator.getInstance().send(); + AuditOperator.getInstance().flush(); } } diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java index 10c3913dd2..f0f44207d5 100644 --- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java +++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java @@ -21,6 +21,8 @@ import org.apache.inlong.audit.loader.SocketAddressListLoader; import org.apache.inlong.audit.protocol.AuditApi; import org.apache.inlong.audit.send.SenderManager; import org.apache.inlong.audit.util.AuditConfig; +import org.apache.inlong.audit.util.AuditDimensions; +import org.apache.inlong.audit.util.AuditValues; import org.apache.inlong.audit.util.Config; import org.apache.inlong.audit.util.StatInfo; @@ -33,6 +35,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Calendar; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.StringJoiner; @@ -40,6 +43,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; import static org.apache.inlong.audit.protocol.AuditApi.BaseCommand.Type.AUDIT_REQUEST; @@ -54,20 +58,49 @@ public class AuditReporterImpl implements Serializable { private static final int BATCH_NUM = 100; private final ReentrantLock GLOBAL_LOCK = new ReentrantLock(); private static final int PERIOD = 1000 * 60; - private final ConcurrentHashMap<String, StatInfo> countMap = new ConcurrentHashMap<>(); - private final ConcurrentHashMap<String, StatInfo> threadCountMap = new ConcurrentHashMap<>(); - private final ConcurrentHashMap<String, StatInfo> deleteCountMap = new ConcurrentHashMap<>(); - private final List<String> deleteKeyList = new ArrayList<>(); + private final ConcurrentHashMap<Long, ConcurrentHashMap<String, StatInfo>> preStatMap = + new ConcurrentHashMap<>(); + private final ConcurrentHashMap<Long, ConcurrentHashMap<String, StatInfo>> summaryStatMap = + new ConcurrentHashMap<>(); + private final ConcurrentHashMap<Long, ConcurrentHashMap<String, StatInfo>> expiredStatMap = + new ConcurrentHashMap<>(); + private final ConcurrentHashMap<Long, List<String>> expiredKeyList = new ConcurrentHashMap<>(); + private final ConcurrentHashMap<Long, Long> flushTime = new ConcurrentHashMap<>(); private final Config config = new Config(); private int packageId = 1; private int dataId = 0; private boolean initialized = false; private SenderManager manager; + private AtomicInteger flushStat = new AtomicInteger(0); private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor(); private AuditConfig auditConfig = null; private SocketAddressListLoader loader = null; + // Resource isolation key is used in checkpoint and other scenarios.DEFAULT 0. + private static final long DEFAULT_ISOLATE_KEY = 0; + private int flushStatThreshold = 100; + private boolean autoFlush = true; + + /** + * Set stat threshold + * + * @param flushStatThreshold + */ + public void setFlushStatThreshold(int flushStatThreshold) { + this.flushStatThreshold = flushStatThreshold; + } + + /** + * When the caller needs to isolate resources, please call this method and pass the parameter true. + * For example, in scenarios such as flink checkpoint + * + * @param autoFlush + */ + public void setAutoFlush(boolean autoFlush) { + this.autoFlush = autoFlush; + } + /** * Init */ @@ -82,7 +115,10 @@ public class AuditReporterImpl implements Serializable { public void run() { try { loadIpPortList(); - send(); + if (autoFlush) { + flush(DEFAULT_ISOLATE_KEY); + } + checkFlushTime(); } catch (Exception e) { LOGGER.error(e.getMessage()); } @@ -198,50 +234,208 @@ public class AuditReporterImpl implements Serializable { keyJoiner.add(String.valueOf(auditID)); keyJoiner.add(auditTag); keyJoiner.add(String.valueOf(auditVersion)); - addByKey(keyJoiner.toString(), count, size, delayTime); + addByKey(DEFAULT_ISOLATE_KEY, keyJoiner.toString(), count, size, delayTime); + } + + /** + * When the caller needs to isolate resources, please call this method. + * For example, in scenarios such as flink checkpoint + * + * @param dimensions + * @param values + */ + public void add(AuditDimensions dimensions, AuditValues values) { + StringJoiner keyJoiner = new StringJoiner(FIELD_SEPARATORS); + keyJoiner.add(String.valueOf(dimensions.getLogTime() / PERIOD)); + keyJoiner.add(dimensions.getInlongGroupID()); + keyJoiner.add(dimensions.getInlongStreamID()); + keyJoiner.add(String.valueOf(dimensions.getAuditID())); + keyJoiner.add(dimensions.getAuditTag()); + keyJoiner.add(String.valueOf(dimensions.getAuditVersion())); + addByKey(dimensions.getIsolateKey(), keyJoiner.toString(), values.getCount(), + values.getSize(), values.getDelayTime()); } /** * Add audit info by key. */ - private void addByKey(String key, long count, long size, long delayTime) { - if (countMap.get(key) == null) { - countMap.put(key, new StatInfo(0L, 0L, 0L)); + private void addByKey(long isolateKey, String statKey, long count, long size, long delayTime) { + if (null == this.preStatMap.get(isolateKey)) { + this.preStatMap.putIfAbsent(isolateKey, new ConcurrentHashMap<>()); + } + ConcurrentHashMap<String, StatInfo> statMap = this.preStatMap.get(isolateKey); + if (null == statMap.get(statKey)) { + statMap.putIfAbsent(statKey, new StatInfo(0L, 0L, 0L)); } - countMap.get(key).count.addAndGet(count); - countMap.get(key).size.addAndGet(size); - countMap.get(key).delay.addAndGet(delayTime); + StatInfo stat = statMap.get(statKey); + stat.count.addAndGet(count); + stat.size.addAndGet(size); + stat.delay.addAndGet(delayTime); + } + + /** + * Flush audit data by default audit version + */ + public synchronized void flush() { + flush(DEFAULT_AUDIT_VERSION); } /** - * Send audit data + * Flush audit data */ - public synchronized void send() { + public synchronized void flush(long isolateKey) { + if (flushTime.putIfAbsent(isolateKey, System.currentTimeMillis()) != null + || flushStat.addAndGet(1) > flushStatThreshold) { + return; + } + LOGGER.info("Audit flush isolate key {} ", isolateKey); manager.clearBuffer(); resetStat(); - // Retrieve statistics from the list of objects without statistics to be eliminated - for (Map.Entry<String, StatInfo> entry : this.deleteCountMap.entrySet()) { - this.sumThreadGroup(entry.getKey(), entry.getValue()); + LOGGER.info("pre stat map size {} {} {} {}", this.preStatMap.size(), this.expiredStatMap.size(), + this.summaryStatMap.size(), this.expiredKeyList.size()); + + summaryExpiredStatMap(isolateKey); + + Iterator<Map.Entry<Long, ConcurrentHashMap<String, StatInfo>>> iterator = this.preStatMap.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<Long, ConcurrentHashMap<String, StatInfo>> entry = iterator.next(); + if (entry.getValue().isEmpty()) { + LOGGER.info("Remove the key of pre stat map: {},isolate key: {} ", entry.getKey(), isolateKey); + iterator.remove(); + continue; + } + if (entry.getKey() > isolateKey) { + continue; + } + summaryPreStatMap(entry.getKey(), entry.getValue()); + send(entry.getKey()); + + } + + clearExpiredKey(isolateKey); + + LOGGER.info("Finish report audit data"); + } + + /** + * Send base command + */ + private void sendByBaseCommand(AuditApi.AuditRequest auditRequest) { + AuditApi.BaseCommand.Builder baseCommand = AuditApi.BaseCommand.newBuilder(); + baseCommand.setType(AUDIT_REQUEST).setAuditRequest(auditRequest).build(); + manager.send(baseCommand.build(), auditRequest); + } + + /** + * Summary + */ + private void sumThreadGroup(long isolateKey, String key, StatInfo statInfo) { + long count = statInfo.count.getAndSet(0); + if (0 == count) { + return; + } + ConcurrentHashMap<String, StatInfo> sumMap = + this.summaryStatMap.computeIfAbsent(isolateKey, k -> new ConcurrentHashMap<>()); + StatInfo stat = sumMap.computeIfAbsent(key, k -> new StatInfo(0L, 0L, 0L)); + stat.count.addAndGet(count); + stat.size.addAndGet(statInfo.size.getAndSet(0)); + stat.delay.addAndGet(statInfo.delay.getAndSet(0)); + } + + /** + * Reset statistics + */ + private void resetStat() { + dataId = 0; + packageId = 1; + } + + /** + * Summary expired stat map + */ + private void summaryExpiredStatMap(long isolateKey) { + Iterator<Map.Entry<Long, ConcurrentHashMap<String, StatInfo>>> iterator = + this.expiredStatMap.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<Long, ConcurrentHashMap<String, StatInfo>> entry = iterator.next(); + if (entry.getValue().isEmpty()) { + LOGGER.info("Remove the key of expired stat map: {},isolate key: {} ", entry.getKey(), isolateKey); + iterator.remove(); + continue; + } + if (entry.getKey() > isolateKey) { + continue; + } + for (Map.Entry<String, StatInfo> statInfo : entry.getValue().entrySet()) { + this.sumThreadGroup(isolateKey, statInfo.getKey(), statInfo.getValue()); + } + entry.getValue().clear(); } - this.deleteCountMap.clear(); - for (Map.Entry<String, StatInfo> entry : countMap.entrySet()) { + } + + /** + * Summary pre stat map + */ + private void summaryPreStatMap(long isolateKey, ConcurrentHashMap<String, StatInfo> statInfo) { + List<String> expiredKeys = this.expiredKeyList.computeIfAbsent(isolateKey, k -> new ArrayList<>()); + + for (Map.Entry<String, StatInfo> entry : statInfo.entrySet()) { String key = entry.getKey(); StatInfo value = entry.getValue(); // If there is no data, enter the list to be eliminated if (value.count.get() == 0) { - this.deleteKeyList.add(key); + if (!expiredKeys.contains(key)) { + expiredKeys.add(key); + } continue; } - this.sumThreadGroup(key, value); + sumThreadGroup(isolateKey, key, value); } + } - // Clean up obsolete statistical data objects - for (String key : this.deleteKeyList) { - StatInfo value = this.countMap.remove(key); - this.deleteCountMap.put(key, value); + /** + * Clear expired key + */ + private void clearExpiredKey(long isolateKey) { + Iterator<Map.Entry<Long, List<String>>> iterator = + this.expiredKeyList.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<Long, List<String>> entry = iterator.next(); + if (entry.getValue().isEmpty()) { + LOGGER.info("Remove the key of expired key list: {},isolate key: {}", entry.getKey(), isolateKey); + iterator.remove(); + continue; + } + if (entry.getKey() > isolateKey) { + continue; + } + + ConcurrentHashMap<String, StatInfo> preStatInfo = this.preStatMap.get(entry.getKey()); + if (null == preStatInfo) { + iterator.remove(); + continue; + } + ConcurrentHashMap<String, StatInfo> deleteMap = + this.expiredStatMap.computeIfAbsent(entry.getKey(), k -> new ConcurrentHashMap<>()); + for (String key : entry.getValue()) { + StatInfo value = preStatInfo.remove(key); + deleteMap.put(key, value); + } + entry.getValue().clear(); } - this.deleteKeyList.clear(); + } + /** + * Send Audit data + */ + private void send(long isolateKey) { + if (null == summaryStatMap.get(isolateKey)) { + return; + } + if (summaryStatMap.get(isolateKey).isEmpty()) { + summaryStatMap.remove(isolateKey); + return; + } long sdkTime = Calendar.getInstance().getTimeInMillis(); AuditApi.AuditMessageHeader msgHeader = AuditApi.AuditMessageHeader.newBuilder() .setIp(config.getLocalIP()).setDockerId(config.getDockerId()) @@ -250,9 +444,8 @@ public class AuditReporterImpl implements Serializable { .build(); AuditApi.AuditRequest.Builder requestBuild = AuditApi.AuditRequest.newBuilder(); requestBuild.setMsgHeader(msgHeader).setRequestId(manager.nextRequestId()); - // Process the stat info for all threads - for (Map.Entry<String, StatInfo> entry : threadCountMap.entrySet()) { + for (Map.Entry<String, StatInfo> entry : summaryStatMap.get(isolateKey).entrySet()) { // Entry key order: logTime inlongGroupID inlongStreamID auditID auditTag auditVersion String[] keyArray = entry.getKey().split(FIELD_SEPARATORS); long logTime = Long.parseLong(keyArray[0]) * PERIOD; @@ -282,49 +475,24 @@ public class AuditReporterImpl implements Serializable { requestBuild.clearMsgBody(); } } + if (requestBuild.getMsgBodyCount() > 0) { sendByBaseCommand(requestBuild.build()); requestBuild.clearMsgBody(); } - threadCountMap.clear(); - - LOGGER.info("Finish report audit data"); - } - - /** - * Send base command - */ - private void sendByBaseCommand(AuditApi.AuditRequest auditRequest) { - AuditApi.BaseCommand.Builder baseCommand = AuditApi.BaseCommand.newBuilder(); - baseCommand.setType(AUDIT_REQUEST).setAuditRequest(auditRequest).build(); - manager.send(baseCommand.build(), auditRequest); + summaryStatMap.get(isolateKey).clear(); } /** - * Summary + * Check flush time */ - private void sumThreadGroup(String key, StatInfo statInfo) { - long count = statInfo.count.getAndSet(0); - if (0 == count) { - return; - } - if (threadCountMap.get(key) == null) { - threadCountMap.put(key, new StatInfo(0, 0, 0)); - } - - long size = statInfo.size.getAndSet(0); - long delay = statInfo.delay.getAndSet(0); - - threadCountMap.get(key).count.addAndGet(count); - threadCountMap.get(key).size.addAndGet(size); - threadCountMap.get(key).delay.addAndGet(delay); - } - - /** - * Reset statistics - */ - private void resetStat() { - dataId = 0; - packageId = 1; + private void checkFlushTime() { + flushStat.set(0); + long currentTime = Calendar.getInstance().getTimeInMillis(); + flushTime.forEach((key, value) -> { + if ((currentTime - value) > PERIOD) { + flushTime.remove(key); + } + }); } } diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java index f3a573c4f9..7b886021a1 100644 --- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java +++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java @@ -139,24 +139,7 @@ public class SenderManager { // cache first Long requestId = baseCommand.getAuditRequest().getRequestId(); this.dataMap.putIfAbsent(requestId, data); - requestIdQueue.offer(requestId); this.sendData(data.getDataByte()); - // resend - long newTime = System.currentTimeMillis() - 10000; - if (newTime > lastCheckTime) { - for (int i = 0; i < requestIdQueue.size(); i++) { - Long current = requestIdQueue.poll(); - AuditData auditData = this.dataMap.get(current); - if (auditData == null) { - continue; - } else { - requestIdQueue.offer(current); - if (newTime > auditData.getSendTime()) { - this.sendData(auditData.getDataByte()); - } - } - } - } } /** @@ -179,7 +162,7 @@ public class SenderManager { * Clean up the backlog of unsent message packets */ public void clearBuffer() { - LOG.info("audit failed cache size: {}", this.dataMap.size()); + LOG.info("Audit failed cache size: {}", this.dataMap.size()); for (AuditData data : this.dataMap.values()) { this.sendData(data.getDataByte()); this.sleep(); @@ -286,6 +269,11 @@ public class SenderManager { if (data == null) { LOG.error("Can not find the request id onMessageReceived {},message: {}", requestId, baseCommand.getAuditReply().getMessage()); + if (LOG.isDebugEnabled()) { + for (Map.Entry<Long, AuditData> entry : this.dataMap.entrySet()) { + LOG.debug("Data map key:{},request id:{}", entry.getKey(), requestId); + } + } return; } // check resp diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditDimensions.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditDimensions.java new file mode 100644 index 0000000000..9a8741a6a9 --- /dev/null +++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditDimensions.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.audit.util; + +/** + * Audit dimensions + */ +public class AuditDimensions { + + private int auditID; + private long logTime; + private long auditVersion; + private long isolateKey; + private String auditTag; + private String inlongGroupID; + private String inlongStreamID; + + public AuditDimensions(int auditID, long logTime, long auditVersion, long isolateKey, String auditTag, + String inlongGroupID, String inlongStreamID) { + this.auditID = auditID; + this.logTime = logTime; + this.auditVersion = auditVersion; + this.isolateKey = isolateKey; + this.auditTag = auditTag; + this.inlongGroupID = inlongGroupID; + this.inlongStreamID = inlongStreamID; + } + + public int getAuditID() { + return auditID; + } + + public void setAuditID(int auditID) { + this.auditID = auditID; + } + + public long getLogTime() { + return logTime; + } + + public void setLogTime(long logTime) { + this.logTime = logTime; + } + + public long getAuditVersion() { + return auditVersion; + } + + public void setAuditVersion(long auditVersion) { + this.auditVersion = auditVersion; + } + + public long getIsolateKey() { + return isolateKey; + } + + public void setIsolateKey(long isolateKey) { + this.isolateKey = isolateKey; + } + + public String getAuditTag() { + return auditTag; + } + + public void setAuditTag(String auditTag) { + this.auditTag = auditTag; + } + + public String getInlongGroupID() { + return inlongGroupID; + } + + public void setInlongGroupID(String inlongGroupID) { + this.inlongGroupID = inlongGroupID; + } + + public String getInlongStreamID() { + return inlongStreamID; + } + + public void setInlongStreamID(String inlongStreamID) { + this.inlongStreamID = inlongStreamID; + } +} diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditValues.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditValues.java new file mode 100644 index 0000000000..d5b74856bd --- /dev/null +++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditValues.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.audit.util; + +/** + * Audit values + */ +public class AuditValues { + + private long count; + private long size; + private long delayTime; + + public AuditValues(long count, long size, long delayTime) { + this.count = count; + this.size = size; + this.delayTime = delayTime; + } + + public long getCount() { + return count; + } + + public void setCount(long count) { + this.count = count; + } + + public long getSize() { + return size; + } + + public void setSize(long size) { + this.size = size; + } + + public long getDelayTime() { + return delayTime; + } + + public void setDelayTime(long delayTime) { + this.delayTime = delayTime; + } +} diff --git a/inlong-audit/sql/apache_inlong_audit.sql b/inlong-audit/sql/apache_inlong_audit.sql index 2d10ca697c..a2b007534a 100644 --- a/inlong-audit/sql/apache_inlong_audit.sql +++ b/inlong-audit/sql/apache_inlong_audit.sql @@ -43,6 +43,7 @@ CREATE TABLE IF NOT EXISTS `audit_data` `inlong_stream_id` varchar(100) NOT NULL DEFAULT '' COMMENT 'The target inlong stream id', `audit_id` varchar(100) NOT NULL DEFAULT '' COMMENT 'Audit id', `audit_tag` varchar(100) DEFAULT '' COMMENT 'Audit tag', + `audit_version` BIGINT DEFAULT -1 COMMENT 'Audit version', `count` BIGINT NOT NULL DEFAULT '0' COMMENT 'Message count', `size` BIGINT NOT NULL DEFAULT '0' COMMENT 'Message size', `delay` BIGINT NOT NULL DEFAULT '0' COMMENT 'Message delay count', diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java index 7bd27521c6..371a4de0c9 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java @@ -123,6 +123,6 @@ public class AuditUtils { * Send audit data */ public static void send() { - AuditOperator.getInstance().send(); + AuditOperator.getInstance().flush(); } } diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/audit/AuditUtils.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/audit/AuditUtils.java index 795d505c1f..f5bd2ebbc2 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/audit/AuditUtils.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/audit/AuditUtils.java @@ -104,6 +104,6 @@ public class AuditUtils { * Send audit data */ public static void send() { - AuditOperator.getInstance().send(); + AuditOperator.getInstance().flush(); } } diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java index 1b09343b5c..7270becdb7 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java @@ -285,7 +285,7 @@ public class SinkMetricData implements MetricData, Serializable { */ public void flushAuditData() { if (auditOperator != null) { - auditOperator.send(); + auditOperator.flush(); } } diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java index 91abcf22aa..0568efd7c3 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java @@ -341,7 +341,7 @@ public class SourceMetricData implements MetricData, Serializable { */ public void flushAuditData() { if (auditOperator != null) { - auditOperator.send(); + auditOperator.flush(); } } diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/audit/AuditUtils.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/audit/AuditUtils.java index 33b0d9f165..0e0c287739 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/audit/AuditUtils.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/audit/AuditUtils.java @@ -112,6 +112,6 @@ public class AuditUtils { if (!auditConfig.isAuditEnable()) { return; } - AuditOperator.getInstance().send(); + AuditOperator.getInstance().flush(); } }