This is an automated email from the ASF dual-hosted git repository. woofyzhao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 8db88eccc [INLONG-7166][DataProxy] Fix audit data reporting (#7167) 8db88eccc is described below commit 8db88eccce43f549c5db265002c791be38f2d619 Author: woofyzhao <zhaozixuan1...@gmail.com> AuthorDate: Thu Jan 5 23:36:22 2023 +0800 [INLONG-7166][DataProxy] Fix audit data reporting (#7167) * [INLONG-7166][DataProxy] Fix audit data reporting --- .../inlong/dataproxy/config/holder/CommonPropertiesHolder.java | 6 ++++-- .../java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java | 4 ++-- .../java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java | 4 ++-- .../inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java | 6 ++++++ .../java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java | 4 +--- .../apache/inlong/sdk/dataproxy/pb/dispatch/DispatchManager.java | 4 ++-- .../org/apache/inlong/sort/standalone/dispatch/DispatchManager.java | 4 ++-- 7 files changed, 19 insertions(+), 13 deletions(-) diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java index 90a5be47c..1d4fe3990 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java @@ -88,8 +88,10 @@ public class CommonPropertiesHolder { * @return the props */ public static Map<String, String> get() { - if (props != null) { - return props; + synchronized (KEY_COMMON_PROPERTIES) { + if (props != null) { + return props; + } } init(); return props; diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java index e03ed01b4..a1da9f076 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java @@ -164,7 +164,7 @@ public class DispatchManager { if (!needOutputOvertimeData.getAndSet(false)) { return; } - LOG.info("start to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{}", + LOG.debug("start to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{}", profileCache.size(), dispatchQueues.stream().mapToInt(LinkedBlockingQueue::size).sum()); long currentTime = System.currentTimeMillis(); long createThreshold = currentTime - dispatchTimeout; @@ -187,7 +187,7 @@ public class DispatchManager { outCounter.addAndGet(dispatchProfile.getCount()); } }); - LOG.info("end to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{},eventCount:{}," + LOG.debug("end to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{},eventCount:{}," + "inCounter:{},outCounter:{}", profileCache.size(), dispatchQueues.stream().mapToInt(LinkedBlockingQueue::size).sum(), eventCount, inCounter.getAndSet(0), outCounter.getAndSet(0)); diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java index 4c1edf190..d5fae2e04 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java @@ -156,7 +156,7 @@ public class BatchPackManager { if (!needOutputOvertimeData.getAndSet(false)) { return; } - LOG.info("start to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{}", + LOG.debug("start to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{}", profileCache.size(), dispatchQueue.size()); long currentTime = System.currentTimeMillis(); long createThreshold = currentTime - dispatchTimeout; @@ -179,7 +179,7 @@ public class BatchPackManager { outCounter.addAndGet(dispatchProfile.getCount()); } }); - LOG.info("end to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{},eventCount:{}," + LOG.debug("end to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{},eventCount:{}," + "inCounter:{},outCounter:{}", profileCache.size(), dispatchQueue.size(), eventCount, inCounter.getAndSet(0), outCounter.getAndSet(0)); diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java index 2af652de7..cb62fea41 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java @@ -164,6 +164,12 @@ public class MessageQueueZoneSinkContext extends SinkContext { * addSendResultMetric */ public void addSendResultMetric(BatchPackProfile currentRecord, String topic, boolean result, long sendTime) { + if (currentRecord instanceof SimpleBatchPackProfileV0) { + AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, + ((SimpleBatchPackProfileV0) currentRecord).getSimpleProfile()); + return; + } + Map<String, String> dimensions = new HashMap<>(); dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.getProxyClusterId()); dimensions.put(DataProxyMetricItem.KEY_SOURCE_ID, "-"); diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java index 8d52abcaf..8907b13a5 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java @@ -287,9 +287,7 @@ public class TubeHandler implements MessageQueueHandler { // sendAsync Message message = new Message(topic, bodyBytes); // add headers - headers.forEach((key, value) -> { - message.setAttrKeyVal(key, value); - }); + headers.forEach(message::setAttrKeyVal); // callback long sendTime = System.currentTimeMillis(); MessageSentCallback callback = new MessageSentCallback() { diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/dispatch/DispatchManager.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/dispatch/DispatchManager.java index 945a4f1dc..9ae6303bb 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/dispatch/DispatchManager.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/dispatch/DispatchManager.java @@ -104,7 +104,7 @@ public class DispatchManager { * @return */ public void outputOvertimeData() { - LOG.info("start to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{}", + LOG.debug("start to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{}", profileCache.size(), dispatchQueue.size()); long currentTime = System.currentTimeMillis(); long createThreshold = currentTime - dispatchTimeout; @@ -122,7 +122,7 @@ public class DispatchManager { removeKeys.forEach((key) -> { dispatchQueue.offer(this.profileCache.remove(key)); }); - LOG.info("end to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{},eventCount:{}", + LOG.debug("end to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{},eventCount:{}", profileCache.size(), dispatchQueue.size(), eventCount); } diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/dispatch/DispatchManager.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/dispatch/DispatchManager.java index cbea8d2f5..e61c05ade 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/dispatch/DispatchManager.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/dispatch/DispatchManager.java @@ -115,7 +115,7 @@ public class DispatchManager { if (!needOutputOvertimeData.getAndSet(false)) { return; } - LOG.info("start to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{}", + LOG.debug("start to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{}", profileCache.size(), dispatchQueue.size()); long currentTime = System.currentTimeMillis(); long createThreshold = currentTime - dispatchTimeout; @@ -139,7 +139,7 @@ public class DispatchManager { outCounter.addAndGet(dispatchProfile.getCount()); } }); - LOG.info("end to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{},eventCount:{}," + LOG.debug("end to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{},eventCount:{}," + "inCounter:{},outCounter:{}", profileCache.size(), dispatchQueue.size(), eventCount, inCounter.getAndSet(0), outCounter.getAndSet(0));