This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 2ea39a2765 [INLONG-9809][Audit] SDK supports both singleton and non-singleton usage (#9812) 2ea39a2765 is described below commit 2ea39a276556eb4319c588ab02440b8c0de3609c Author: doleyzi <43397300+dole...@users.noreply.github.com> AuthorDate: Wed Mar 13 16:53:26 2024 +0800 [INLONG-9809][Audit] SDK supports both singleton and non-singleton usage (#9812) --- .../org/apache/inlong/audit/AuditOperator.java | 319 +-------------------- .../{AuditOperator.java => AuditReporterImpl.java} | 44 +-- .../DefaultISocketAddressListLoader.java | 2 +- .../{ => loader}/DnsSocketAddressListLoader.java | 2 +- .../{ => loader}/SocketAddressListLoader.java | 2 +- 5 files changed, 23 insertions(+), 346 deletions(-) diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java index 0c6f573f9b..85d6201a94 100644 --- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java +++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java @@ -17,332 +17,27 @@ package org.apache.inlong.audit; -import org.apache.inlong.audit.protocol.AuditApi; -import org.apache.inlong.audit.send.SenderManager; -import org.apache.inlong.audit.util.AuditConfig; -import org.apache.inlong.audit.util.Config; -import org.apache.inlong.audit.util.StatInfo; - -import org.apache.commons.lang3.ClassUtils; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Calendar; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.StringJoiner; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; - -import static org.apache.inlong.audit.protocol.AuditApi.BaseCommand.Type.AUDIT_REQUEST; - /** * Audit operator, which is singleton. + * Creating objects through deserialization is not supported. */ -public class AuditOperator implements Serializable { - - private static final long serialVersionUID = 1L; - private static final Logger LOGGER = LoggerFactory.getLogger(AuditOperator.class); - private static final String FIELD_SEPARATORS = ":"; - private static final String DEFAULT_AUDIT_TAG = "-1"; - private static final long DEFAULT_AUDIT_VERSION = -1; - private static final int BATCH_NUM = 100; - private static final AuditOperator AUDIT_OPERATOR = new AuditOperator(); - private static final ReentrantLock GLOBAL_LOCK = new ReentrantLock(); - private static final int PERIOD = 1000 * 60; - private final ConcurrentHashMap<String, StatInfo> countMap = new ConcurrentHashMap<>(); - private final HashMap<String, StatInfo> threadCountMap = new HashMap<>(); - private final ConcurrentHashMap<String, StatInfo> deleteCountMap = new ConcurrentHashMap<>(); - private final List<String> deleteKeyList = new ArrayList<>(); - private final Config config = new Config(); - private int packageId = 1; - private int dataId = 0; - private boolean initialized = false; - private SenderManager manager; - - private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor(); - private AuditConfig auditConfig = null; - private SocketAddressListLoader loader = null; +public final class AuditOperator extends AuditReporterImpl { /** * Not support create from outer. */ private AuditOperator() { - - } - - /** - * Get AuditOperator instance. - */ - public static AuditOperator getInstance() { - return AUDIT_OPERATOR; } - /** - * init - */ - private void init() { - if (initialized) { - return; - } - config.init(); - timeoutExecutor.scheduleWithFixedDelay(new Runnable() { - - @Override - public void run() { - try { - loadIpPortList(); - send(); - } catch (Exception e) { - LOGGER.error(e.getMessage()); - } - } - - }, PERIOD, PERIOD, TimeUnit.MILLISECONDS); - if (auditConfig == null) { - auditConfig = new AuditConfig(); - } - this.manager = new SenderManager(auditConfig); - } + private static class AuditOperatorHandler { - private void loadIpPortList() { - if (loader == null) { - return; - } - try { - List<String> ipPortList = loader.loadSocketAddressList(); - if (ipPortList != null && ipPortList.size() > 0) { - HashSet<String> ipPortSet = new HashSet<>(); - ipPortSet.addAll(ipPortList); - this.setAuditProxy(ipPortSet); - } - } catch (Exception e) { - LOGGER.error(e.getMessage()); - } + private static final AuditOperator INSTANCE = new AuditOperator(); } /** - * set loader - * - * @param loader the loader to set - */ - public void setLoader(SocketAddressListLoader loader) { - this.loader = loader; - } - - /** - * setLoaderClass - * - * @param loaderClassName - */ - public void setLoaderClass(String loaderClassName) { - if (StringUtils.isEmpty(loaderClassName)) { - return; - } - try { - Class<?> loaderClass = ClassUtils.getClass(loaderClassName); - Object loaderObject = loaderClass.getDeclaredConstructor().newInstance(); - if (loaderObject instanceof SocketAddressListLoader) { - SocketAddressListLoader loader = (SocketAddressListLoader) loaderObject; - this.loader = loader; - LOGGER.info("audit IpPortListLoader loaderClass:{}", loaderClassName); - } - } catch (Throwable t) { - LOGGER.error("Fail to init IpPortListLoader,loaderClass:{},error:{}", - loaderClassName, t.getMessage(), t); - } - } - - /** - * Set AuditProxy from the ip - */ - public void setAuditProxy(HashSet<String> ipPortList) { - try { - GLOBAL_LOCK.lockInterruptibly(); - if (!initialized) { - init(); - initialized = true; - } - this.manager.setAuditProxy(ipPortList); - } catch (InterruptedException e) { - LOGGER.error(e.getMessage()); - } finally { - GLOBAL_LOCK.unlock(); - } - } - - /** - * set audit config - */ - public void setAuditConfig(AuditConfig config) { - auditConfig = config; - manager.setAuditConfig(config); - } - - /** - * Add audit data - */ - public void add(int auditID, String inlongGroupID, String inlongStreamID, Long logTime, long count, long size) { - add(auditID, DEFAULT_AUDIT_TAG, inlongGroupID, inlongStreamID, logTime, count, size); - } - - public void add(int auditID, String auditTag, String inlongGroupID, String inlongStreamID, Long logTime, - long count, long size) { - long delayTime = System.currentTimeMillis() - logTime; - add(auditID, auditTag, inlongGroupID, inlongStreamID, logTime, count, size, - delayTime * count, DEFAULT_AUDIT_VERSION); - } - - public void add(int auditID, String inlongGroupID, String inlongStreamID, Long logTime, long count, long size, - long delayTime) { - add(auditID, DEFAULT_AUDIT_TAG, inlongGroupID, inlongStreamID, logTime, count, size, - delayTime, DEFAULT_AUDIT_VERSION); - } - - public void add(int auditID, String auditTag, String inlongGroupID, String inlongStreamID, Long logTime, - long count, long size, long delayTime, long auditVersion) { - StringJoiner keyJoiner = new StringJoiner(FIELD_SEPARATORS); - keyJoiner.add(String.valueOf(logTime / PERIOD)); - keyJoiner.add(inlongGroupID); - keyJoiner.add(inlongStreamID); - keyJoiner.add(String.valueOf(auditID)); - keyJoiner.add(auditTag); - keyJoiner.add(String.valueOf(auditVersion)); - addByKey(keyJoiner.toString(), count, size, delayTime); - } - - /** - * Add audit info by key. - */ - private void addByKey(String key, long count, long size, long delayTime) { - if (countMap.get(key) == null) { - countMap.put(key, new StatInfo(0L, 0L, 0L)); - } - countMap.get(key).count.addAndGet(count); - countMap.get(key).size.addAndGet(size); - countMap.get(key).delay.addAndGet(delayTime); - } - - /** - * Send audit data - */ - public synchronized void send() { - manager.clearBuffer(); - resetStat(); - // Retrieve statistics from the list of objects without statistics to be eliminated - for (Map.Entry<String, StatInfo> entry : this.deleteCountMap.entrySet()) { - this.sumThreadGroup(entry.getKey(), entry.getValue()); - } - this.deleteCountMap.clear(); - for (Map.Entry<String, StatInfo> entry : countMap.entrySet()) { - String key = entry.getKey(); - StatInfo value = entry.getValue(); - // If there is no data, enter the list to be eliminated - if (value.count.get() == 0) { - this.deleteKeyList.add(key); - continue; - } - this.sumThreadGroup(key, value); - } - - // Clean up obsolete statistical data objects - for (String key : this.deleteKeyList) { - StatInfo value = this.countMap.remove(key); - this.deleteCountMap.put(key, value); - } - this.deleteKeyList.clear(); - - long sdkTime = Calendar.getInstance().getTimeInMillis(); - AuditApi.AuditMessageHeader msgHeader = AuditApi.AuditMessageHeader.newBuilder() - .setIp(config.getLocalIP()).setDockerId(config.getDockerId()) - .setThreadId(String.valueOf(Thread.currentThread().getId())) - .setSdkTs(sdkTime).setPacketId(packageId) - .build(); - AuditApi.AuditRequest.Builder requestBuild = AuditApi.AuditRequest.newBuilder(); - requestBuild.setMsgHeader(msgHeader).setRequestId(manager.nextRequestId()); - - // process the stat info for all threads - for (Map.Entry<String, StatInfo> entry : threadCountMap.entrySet()) { - // Entry key order: logTime inlongGroupID inlongStreamID auditID auditTag auditVersion - String[] keyArray = entry.getKey().split(FIELD_SEPARATORS); - long logTime = Long.parseLong(keyArray[0]) * PERIOD; - String inlongGroupID = keyArray[1]; - String inlongStreamID = keyArray[2]; - String auditID = keyArray[3]; - String auditTag = keyArray[4]; - long auditVersion = Long.parseLong(keyArray[5]); - StatInfo value = entry.getValue(); - AuditApi.AuditMessageBody msgBody = AuditApi.AuditMessageBody.newBuilder() - .setLogTs(logTime) - .setInlongGroupId(inlongGroupID) - .setInlongStreamId(inlongStreamID) - .setAuditId(auditID) - .setAuditTag(auditTag) - .setCount(value.count.get()) - .setSize(value.size.get()) - .setDelay(value.delay.get()) - .setAuditVersion(auditVersion) - .build(); - requestBuild.addMsgBody(msgBody); - - if (dataId++ >= BATCH_NUM) { - dataId = 0; - packageId++; - sendByBaseCommand(requestBuild.build()); - requestBuild.clearMsgBody(); - } - } - if (requestBuild.getMsgBodyCount() > 0) { - sendByBaseCommand(requestBuild.build()); - requestBuild.clearMsgBody(); - } - threadCountMap.clear(); - - LOGGER.info("finish report audit data"); - } - - /** - * Send base command - */ - private void sendByBaseCommand(AuditApi.AuditRequest auditRequest) { - AuditApi.BaseCommand.Builder baseCommand = AuditApi.BaseCommand.newBuilder(); - baseCommand.setType(AUDIT_REQUEST).setAuditRequest(auditRequest).build(); - manager.send(baseCommand.build(), auditRequest); - } - - /** - * Summary - */ - private void sumThreadGroup(String key, StatInfo statInfo) { - long count = statInfo.count.getAndSet(0); - if (0 == count) { - return; - } - if (threadCountMap.get(key) == null) { - threadCountMap.put(key, new StatInfo(0, 0, 0)); - } - - long size = statInfo.size.getAndSet(0); - long delay = statInfo.delay.getAndSet(0); - - threadCountMap.get(key).count.addAndGet(count); - threadCountMap.get(key).size.addAndGet(size); - threadCountMap.get(key).delay.addAndGet(delay); - } - - /** - * Reset statistics + * Get AuditOperator instance. */ - private void resetStat() { - dataId = 0; - packageId = 1; + public static AuditOperator getInstance() { + return AuditOperatorHandler.INSTANCE; } } diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java similarity index 92% copy from inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java copy to inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java index 0c6f573f9b..10c3913dd2 100644 --- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java +++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java @@ -17,6 +17,7 @@ package org.apache.inlong.audit; +import org.apache.inlong.audit.loader.SocketAddressListLoader; import org.apache.inlong.audit.protocol.AuditApi; import org.apache.inlong.audit.send.SenderManager; import org.apache.inlong.audit.util.AuditConfig; @@ -31,7 +32,6 @@ import org.slf4j.LoggerFactory; import java.io.Serializable; import java.util.ArrayList; import java.util.Calendar; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -44,22 +44,18 @@ import java.util.concurrent.locks.ReentrantLock; import static org.apache.inlong.audit.protocol.AuditApi.BaseCommand.Type.AUDIT_REQUEST; -/** - * Audit operator, which is singleton. - */ -public class AuditOperator implements Serializable { +public class AuditReporterImpl implements Serializable { private static final long serialVersionUID = 1L; - private static final Logger LOGGER = LoggerFactory.getLogger(AuditOperator.class); + private static final Logger LOGGER = LoggerFactory.getLogger(AuditReporterImpl.class); private static final String FIELD_SEPARATORS = ":"; private static final String DEFAULT_AUDIT_TAG = "-1"; private static final long DEFAULT_AUDIT_VERSION = -1; private static final int BATCH_NUM = 100; - private static final AuditOperator AUDIT_OPERATOR = new AuditOperator(); - private static final ReentrantLock GLOBAL_LOCK = new ReentrantLock(); + private final ReentrantLock GLOBAL_LOCK = new ReentrantLock(); private static final int PERIOD = 1000 * 60; private final ConcurrentHashMap<String, StatInfo> countMap = new ConcurrentHashMap<>(); - private final HashMap<String, StatInfo> threadCountMap = new HashMap<>(); + private final ConcurrentHashMap<String, StatInfo> threadCountMap = new ConcurrentHashMap<>(); private final ConcurrentHashMap<String, StatInfo> deleteCountMap = new ConcurrentHashMap<>(); private final List<String> deleteKeyList = new ArrayList<>(); private final Config config = new Config(); @@ -73,21 +69,7 @@ public class AuditOperator implements Serializable { private SocketAddressListLoader loader = null; /** - * Not support create from outer. - */ - private AuditOperator() { - - } - - /** - * Get AuditOperator instance. - */ - public static AuditOperator getInstance() { - return AUDIT_OPERATOR; - } - - /** - * init + * Init */ private void init() { if (initialized) { @@ -130,7 +112,7 @@ public class AuditOperator implements Serializable { } /** - * set loader + * Set loader * * @param loader the loader to set */ @@ -139,7 +121,7 @@ public class AuditOperator implements Serializable { } /** - * setLoaderClass + * SetLoaderClass * * @param loaderClassName */ @@ -153,10 +135,10 @@ public class AuditOperator implements Serializable { if (loaderObject instanceof SocketAddressListLoader) { SocketAddressListLoader loader = (SocketAddressListLoader) loaderObject; this.loader = loader; - LOGGER.info("audit IpPortListLoader loaderClass:{}", loaderClassName); + LOGGER.info("Audit list loader class:{}", loaderClassName); } } catch (Throwable t) { - LOGGER.error("Fail to init IpPortListLoader,loaderClass:{},error:{}", + LOGGER.error("Fail to init list loader class:{},error:{}", loaderClassName, t.getMessage(), t); } } @@ -180,7 +162,7 @@ public class AuditOperator implements Serializable { } /** - * set audit config + * Set audit config */ public void setAuditConfig(AuditConfig config) { auditConfig = config; @@ -269,7 +251,7 @@ public class AuditOperator implements Serializable { AuditApi.AuditRequest.Builder requestBuild = AuditApi.AuditRequest.newBuilder(); requestBuild.setMsgHeader(msgHeader).setRequestId(manager.nextRequestId()); - // process the stat info for all threads + // Process the stat info for all threads for (Map.Entry<String, StatInfo> entry : threadCountMap.entrySet()) { // Entry key order: logTime inlongGroupID inlongStreamID auditID auditTag auditVersion String[] keyArray = entry.getKey().split(FIELD_SEPARATORS); @@ -306,7 +288,7 @@ public class AuditOperator implements Serializable { } threadCountMap.clear(); - LOGGER.info("finish report audit data"); + LOGGER.info("Finish report audit data"); } /** diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/DefaultISocketAddressListLoader.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/loader/DefaultISocketAddressListLoader.java similarity index 98% rename from inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/DefaultISocketAddressListLoader.java rename to inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/loader/DefaultISocketAddressListLoader.java index c6629e6de2..6a8b6a3b29 100644 --- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/DefaultISocketAddressListLoader.java +++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/loader/DefaultISocketAddressListLoader.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.audit; +package org.apache.inlong.audit.loader; import org.apache.commons.lang3.StringUtils; diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/DnsSocketAddressListLoader.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/loader/DnsSocketAddressListLoader.java similarity index 98% rename from inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/DnsSocketAddressListLoader.java rename to inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/loader/DnsSocketAddressListLoader.java index 95c9d969d3..fa0f289c41 100644 --- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/DnsSocketAddressListLoader.java +++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/loader/DnsSocketAddressListLoader.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.audit; +package org.apache.inlong.audit.loader; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/SocketAddressListLoader.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/loader/SocketAddressListLoader.java similarity index 96% rename from inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/SocketAddressListLoader.java rename to inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/loader/SocketAddressListLoader.java index 0902964a7d..067edb8a05 100644 --- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/SocketAddressListLoader.java +++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/loader/SocketAddressListLoader.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.audit; +package org.apache.inlong.audit.loader; import java.util.List; import java.util.Map;