This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 1f310a1e5 [INLONG-6031][Audit] Clean code for InLong Audit (#6032) 1f310a1e5 is described below commit 1f310a1e587b0e69d44019f9f66eef42f14b7af3 Author: ciscozhou <45899072+ciscoz...@users.noreply.github.com> AuthorDate: Tue Sep 27 20:19:07 2022 +0800 [INLONG-6031][Audit] Clean code for InLong Audit (#6032) Co-authored-by: healchow <healc...@gmail.com> --- .../inlong/agent/metrics/audit/AuditUtils.java | 31 +- .../org/apache/inlong/agent/core/AgentMain.java | 22 +- .../org/apache/inlong/audit/protocol/Commands.java | 19 +- .../audit-common/src/main/proto/AuditApi.proto | 30 +- .../inlong/audit/source/ServerMessageHandler.java | 180 ++++---- .../audit/{AuditImp.java => AuditOperator.java} | 174 ++++---- .../apache/inlong/audit/send/SenderHandler.java | 19 +- .../apache/inlong/audit/send/SenderManager.java | 115 +++--- .../org/apache/inlong/audit/util/AuditData.java | 20 +- .../apache/inlong/audit/util/AuditDataTest.java | 16 +- .../inlong/dataproxy/metrics/audit/AuditUtils.java | 62 ++- .../apache/inlong/dataproxy/node/Application.java | 458 ++++++++++----------- ...ovider.java => ManagerPropsConfigProvider.java} | 38 +- .../sort/standalone/SortStandaloneApplication.java | 19 +- .../sort/standalone/metrics/audit/AuditUtils.java | 50 +-- .../inlong/sort/base/metric/SinkMetricData.java | 26 +- .../inlong/sort/base/metric/SourceMetricData.java | 18 +- .../server/broker/stats/audit/AuditUtils.java | 42 +- 18 files changed, 605 insertions(+), 734 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java index d3ad42538..ff6518ae5 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java @@ -19,9 +19,10 @@ package org.apache.inlong.agent.metrics.audit; import org.apache.commons.lang3.StringUtils; import org.apache.inlong.agent.conf.AgentConfiguration; -import org.apache.inlong.audit.AuditImp; +import org.apache.inlong.audit.AuditOperator; import org.apache.inlong.audit.util.AuditConfig; +import java.util.Collections; import java.util.HashSet; import static org.apache.inlong.agent.constant.AgentConstants.AUDIT_ENABLE; @@ -44,49 +45,47 @@ public class AuditUtils { private static boolean IS_AUDIT = true; /** - * initAudit + * Init audit config */ public static void initAudit() { AgentConfiguration conf = AgentConfiguration.getAgentConf(); - // IS_AUDIT IS_AUDIT = conf.getBoolean(AUDIT_ENABLE, DEFAULT_AUDIT_ENABLE); if (IS_AUDIT) { // AuditProxy String strIpPorts = conf.get(AUDIT_KEY_PROXYS, DEFAULT_AUDIT_PROXYS); - HashSet<String> proxys = new HashSet<>(); + HashSet<String> proxySet = new HashSet<>(); if (!StringUtils.isBlank(strIpPorts)) { String[] ipPorts = strIpPorts.split("\\s+"); - for (String ipPort : ipPorts) { - proxys.add(ipPort); - } + Collections.addAll(proxySet, ipPorts); } - AuditImp.getInstance().setAuditProxy(proxys); + AuditOperator.getInstance().setAuditProxy(proxySet); + // AuditConfig String filePath = conf.get(AUDIT_KEY_FILE_PATH, AUDIT_DEFAULT_FILE_PATH); int maxCacheRow = conf.getInt(AUDIT_KEY_MAX_CACHE_ROWS, AUDIT_DEFAULT_MAX_CACHE_ROWS); AuditConfig auditConfig = new AuditConfig(filePath, maxCacheRow); - AuditImp.getInstance().setAuditConfig(auditConfig); + AuditOperator.getInstance().setAuditConfig(auditConfig); } } /** - * add audit metric + * Add audit metric */ - public static void add(int auditID, String inlongGroupId, String inlongStreamId, long logTime, int count, - long size) { + public static void add(int auditID, String inlongGroupId, String inlongStreamId, + long logTime, int count, long size) { if (!IS_AUDIT) { return; } - AuditImp.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, count, size); + AuditOperator.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, count, size); } /** - * sendReport + * Send audit data */ - public static void sendReport() { + public static void send() { if (!IS_AUDIT) { return; } - AuditImp.getInstance().sendReport(); + AuditOperator.getInstance().send(); } } diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentMain.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentMain.java index ff48cf47f..6816f57f3 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentMain.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentMain.java @@ -85,44 +85,42 @@ public class AgentMain { } /** - * Stopping agent gracefully if get killed. + * Stopping agent manager gracefully if it was killed. * - * @param manager agent manager + * @param agentManager agent manager */ - private static void stopManagerIfKilled(AgentManager manager) { + private static void stopAgentIfKilled(AgentManager agentManager) { Runtime.getRuntime().addShutdownHook(new Thread(() -> { try { LOGGER.info("stopping agent gracefully"); - manager.stop(); + agentManager.stop(); } catch (Exception ex) { - LOGGER.error("exception while stopping threads", ex); + LOGGER.error("stop agent manager error: ", ex); } })); } /** * Main entrance. - * - * @param args arguments - * @throws Exception exceptions */ public static void main(String[] args) throws Exception { CommandLine cl = initOptions(args); assert cl != null; initAgentConf(cl); AuditUtils.initAudit(); + AgentManager manager = new AgentManager(); try { manager.start(); - stopManagerIfKilled(manager); - //metrics + stopAgentIfKilled(manager); + // metrics MetricObserver.init(AgentConfiguration.getAgentConf().getConfigProperties()); manager.join(); } catch (Exception ex) { - LOGGER.error("exception caught", ex); + LOGGER.error("agent running exception: ", ex); } finally { manager.stop(); - AuditUtils.sendReport(); + AuditUtils.send(); } } } diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/protocol/Commands.java b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/protocol/Commands.java index fc833bf73..7ae674b77 100644 --- a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/protocol/Commands.java +++ b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/protocol/Commands.java @@ -25,6 +25,9 @@ import org.apache.inlong.audit.protocol.AuditApi.BaseCommand; import org.apache.inlong.audit.protocol.AuditApi.BaseCommand.Type; import org.apache.inlong.audit.protocol.AuditApi.Pong; +/** + * Audit commands, used to get various of ByteBuf. + */ public class Commands { public static int HEAD_LENGTH = 4; @@ -45,24 +48,24 @@ public class Commands { public static ByteBuf getAuditRequestBuffer(AuditRequest auditRequest) { BaseCommand cmdAuditRequest = BaseCommand.newBuilder() - .setType(Type.AUDITREQUEST) + .setType(Type.AUDIT_REQUEST) .setAuditRequest(auditRequest).build(); return getChannelBuffer(cmdAuditRequest.toByteArray()); } - public static ByteBuf getAuditReplylBuffer(AuditReply auditReply) { + public static ByteBuf getAuditReplyBuffer(AuditReply auditReply) { BaseCommand cmdAuditReply = BaseCommand.newBuilder() - .setType(Type.AUDITREPLY) + .setType(Type.AUDIT_REPLY) .setAuditReply(auditReply).build(); return getChannelBuffer(cmdAuditReply.toByteArray()); } private static ByteBuf getChannelBuffer(byte[] body) { - /* [totalSize] | [body]*/ + // [totalSize] | [body] int totalLength = body.length; - ByteBuf cmdPingBuffer = ByteBufAllocator.DEFAULT.buffer(HEAD_LENGTH + totalLength); - cmdPingBuffer.writeInt(totalLength); - cmdPingBuffer.writeBytes(body); - return cmdPingBuffer; + ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(HEAD_LENGTH + totalLength); + buffer.writeInt(totalLength); + buffer.writeBytes(body); + return buffer; } } diff --git a/inlong-audit/audit-common/src/main/proto/AuditApi.proto b/inlong-audit/audit-common/src/main/proto/AuditApi.proto index ce98fcdcf..ce89b112b 100644 --- a/inlong-audit/audit-common/src/main/proto/AuditApi.proto +++ b/inlong-audit/audit-common/src/main/proto/AuditApi.proto @@ -20,17 +20,17 @@ syntax = "proto3"; package org.apache.inlong.audit.protocol; message BaseCommand { - enum Type { - PING = 0; - PONG = 1; - AUDITREQUEST = 2; - AUDITREPLY = 3; - } - Type type = 1; - AuditRequest audit_request = 2; - AuditReply audit_reply = 3; - Ping ping = 4; - Pong pong = 5; + enum Type { + PING = 0; + PONG = 1; + AUDIT_REQUEST = 2; + AUDIT_REPLY = 3; + } + Type type = 1; + AuditRequest audit_request = 2; + AuditReply audit_reply = 3; + Ping ping = 4; + Pong pong = 5; } message Ping { @@ -55,8 +55,8 @@ message AuditMessageHeader { message AuditMessageBody { uint64 log_ts = 1; - string inlong_group_id= 2; - string inlong_stream_id= 3; + string inlong_group_id = 2; + string inlong_stream_id = 3; string audit_id = 4; uint64 count = 5; uint64 size = 6; @@ -65,8 +65,8 @@ message AuditMessageBody { message AuditReply { enum RSP_CODE { - SUCCESS = 0; - FAILED = 1; + SUCCESS = 0; + FAILED = 1; DISASTER = 2; } uint64 request_id = 1; diff --git a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java index 483698e79..4f4c86345 100644 --- a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java +++ b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java @@ -17,22 +17,16 @@ package org.apache.inlong.audit.source; -import static com.google.common.base.Preconditions.checkArgument; - import com.google.gson.Gson; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.group.ChannelGroup; -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.util.List; import org.apache.flume.Event; import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.event.EventBuilder; import org.apache.flume.source.AbstractSource; - import org.apache.inlong.audit.protocol.AuditApi.AuditMessageBody; import org.apache.inlong.audit.protocol.AuditApi.AuditReply; import org.apache.inlong.audit.protocol.AuditApi.AuditReply.RSP_CODE; @@ -43,43 +37,40 @@ import org.apache.inlong.audit.protocol.Commands; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; + +import static com.google.common.base.Preconditions.checkArgument; + /** * Server message handler - * */ public class ServerMessageHandler extends ChannelInboundHandlerAdapter { - private static final Logger logger = LoggerFactory.getLogger(ServerMessageHandler.class); - - private static final String DEFAULT_REMOTE_IP_VALUE = "0.0.0.0"; + private static final Logger LOGGER = LoggerFactory.getLogger(ServerMessageHandler.class); + private static final Gson GSON = new Gson(); - private AbstractSource source; private final ChannelGroup allChannels; - private int maxConnections = Integer.MAX_VALUE; - private final ChannelProcessor processor; private final ServiceDecoder serviceDecoder; - - private final Gson gson = new Gson(); + private final int maxConnections; public ServerMessageHandler(AbstractSource source, ServiceDecoder serviceDecoder, - ChannelGroup allChannels, Integer maxCons) { - this.source = source; + ChannelGroup allChannels, Integer maxCons) { this.processor = source.getChannelProcessor(); this.serviceDecoder = serviceDecoder; this.allChannels = allChannels; this.maxConnections = maxCons; - } @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { + public void channelActive(ChannelHandlerContext ctx) { if (allChannels.size() - 1 >= maxConnections) { - logger.warn("refuse to connect , and connections=" + (allChannels.size() - 1) - + ", maxConnections=" - + maxConnections + ",channel is " + ctx.channel()); ctx.channel().disconnect(); ctx.channel().close(); + LOGGER.warn("refuse to connect to channel: {}, connections={}, maxConnections={}", + ctx.channel(), allChannels.size() - 1, maxConnections); } allChannels.add(ctx.channel()); ctx.fireChannelActive(); @@ -93,126 +84,115 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - logger.debug("message received"); if (msg == null) { - logger.warn("get null message event, just skip"); + LOGGER.warn("get null message event, just skip"); return; } - ByteBuf cb = (ByteBuf) msg; - int len = cb.readableBytes(); + ByteBuf buf = (ByteBuf) msg; + int len = buf.readableBytes(); if (len == 0) { - logger.warn("receive message skip empty msg."); - cb.clear(); + LOGGER.warn("receive message skip empty msg"); + buf.clear(); return; } Channel remoteChannel = ctx.channel(); - BaseCommand cmd = null; + BaseCommand cmd; try { - cmd = serviceDecoder.extractData(cb, remoteChannel); + cmd = serviceDecoder.extractData(buf, remoteChannel); } catch (Exception ex) { - logger.error("extractData has error e {}", ex); - throw new IOException(ex.getCause()); + LOGGER.error("extract data error: ", ex); + throw new IOException(ex); } - if (cmd == null) { - logger.warn("receive message extractData is null"); + LOGGER.warn("extract data from received msg is null"); return; } + ByteBuf channelBuffer = null; switch (cmd.getType()) { case PING: checkArgument(cmd.hasPing()); - channelBuffer = Commands.getPongChannelBuffer(); + channelBuffer = Commands.getPongChannelBuffer(); break; case PONG: checkArgument(cmd.hasPong()); - channelBuffer = Commands.getPingChannelBuffer(); + channelBuffer = Commands.getPingChannelBuffer(); break; - case AUDITREQUEST: + case AUDIT_REQUEST: checkArgument(cmd.hasAuditRequest()); AuditReply auditReply = handleRequest(cmd.getAuditRequest()); - channelBuffer = Commands.getAuditReplylBuffer(auditReply); + channelBuffer = Commands.getAuditReplyBuffer(auditReply); break; - case AUDITREPLY: + case AUDIT_REPLY: checkArgument(cmd.hasAuditReply()); break; default: - channelBuffer = null; } if (channelBuffer != null) { writeResponse(remoteChannel, channelBuffer); } } - private AuditReply handleRequest(AuditRequest auditRequest) { - AuditReply reply = null; - if (auditRequest != null) { - List<AuditMessageBody> bodyList = auditRequest.getMsgBodyList(); - if (bodyList != null) { - int errorMsgBody = 0; - for (AuditMessageBody auditMessageBody : bodyList) { - AuditData auditData = new AuditData(); - auditData.setIp(auditRequest.getMsgHeader().getIp()); - auditData.setThreadId(auditRequest.getMsgHeader().getThreadId()); - auditData.setDockerId(auditRequest.getMsgHeader().getDockerId()); - auditData.setPacketId(auditRequest.getMsgHeader().getPacketId()); - auditData.setSdkTs(auditRequest.getMsgHeader().getSdkTs()); - - auditData.setLogTs(auditMessageBody.getLogTs()); - auditData.setAuditId(auditMessageBody.getAuditId()); - auditData.setCount(auditMessageBody.getCount()); - auditData.setDelay(auditMessageBody.getDelay()); - auditData.setInlongGroupId(auditMessageBody.getInlongGroupId()); - auditData.setInlongStreamId(auditMessageBody.getInlongStreamId()); - auditData.setSize(auditMessageBody.getSize()); - - byte[] body = null; - try { - body = gson.toJson(auditData).getBytes("UTF-8"); - } catch (UnsupportedEncodingException e) { - logger.error("UnsupportedEncodingException = {}", e); - } - if (body != null) { - Event event = null; - try { - event = EventBuilder.withBody(body, null); - processor.processEvent(event); - } catch (Throwable ex) { - logger.error("Error writing to controller,data will discard.", ex); - errorMsgBody++; - } - } - } - if (errorMsgBody != 0) { - reply = AuditReply.newBuilder().setRequestId(auditRequest.getRequestId()) - .setMessage("Error writing to controller,data " - + "will discard. error body num = " - + errorMsgBody).setRspCode(RSP_CODE.FAILED).build(); - } + private AuditReply handleRequest(AuditRequest auditRequest) throws Exception { + if (auditRequest == null) { + throw new Exception("audit request cannot be null"); + } + AuditReply reply = AuditReply.newBuilder() + .setRequestId(auditRequest.getRequestId()) + .setRspCode(RSP_CODE.SUCCESS) + .build(); + List<AuditMessageBody> bodyList = auditRequest.getMsgBodyList(); + int errorMsgBody = 0; + for (AuditMessageBody auditMessageBody : bodyList) { + AuditData auditData = new AuditData(); + auditData.setIp(auditRequest.getMsgHeader().getIp()); + auditData.setThreadId(auditRequest.getMsgHeader().getThreadId()); + auditData.setDockerId(auditRequest.getMsgHeader().getDockerId()); + auditData.setPacketId(auditRequest.getMsgHeader().getPacketId()); + auditData.setSdkTs(auditRequest.getMsgHeader().getSdkTs()); + + auditData.setLogTs(auditMessageBody.getLogTs()); + auditData.setAuditId(auditMessageBody.getAuditId()); + auditData.setCount(auditMessageBody.getCount()); + auditData.setDelay(auditMessageBody.getDelay()); + auditData.setInlongGroupId(auditMessageBody.getInlongGroupId()); + auditData.setInlongStreamId(auditMessageBody.getInlongStreamId()); + auditData.setSize(auditMessageBody.getSize()); + + try { + byte[] body = GSON.toJson(auditData).getBytes(StandardCharsets.UTF_8); + Event event = EventBuilder.withBody(body, null); + processor.processEvent(event); + } catch (Throwable ex) { + LOGGER.error("writing data error, discard it: ", ex); + errorMsgBody++; } } - if (reply == null) { - reply = AuditReply.newBuilder().setRequestId(auditRequest.getRequestId()) - .setRspCode(RSP_CODE.SUCCESS).build(); + + if (errorMsgBody != 0) { + reply = reply.toBuilder() + .setMessage("writing data error, discard it, error body count=" + errorMsgBody) + .setRspCode(RSP_CODE.FAILED) + .build(); } + return reply; } @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - logger.error("exception caught", cause); + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + LOGGER.error("exception caught", cause); } - private void writeResponse(Channel remoteChannel, ByteBuf buffer) throws Exception { - if (remoteChannel.isWritable()) { - remoteChannel.writeAndFlush(buffer); - } else { - logger.warn( - "the send buffer2 is full, so disconnect it!please check remote client" - + "; Connection info:" + remoteChannel); - throw new Exception(new Throwable( - "the send buffer2 is full,so disconnect it!please check remote client, Connection info:" - + remoteChannel)); + private void writeResponse(Channel channel, ByteBuf buffer) throws Exception { + if (channel.isWritable()) { + channel.writeAndFlush(buffer); + return; } + + String msg = String.format("remote channel=%s is not writable, please check remote client!", channel); + LOGGER.warn(msg); + throw new Exception(msg); } + } diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditImp.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java similarity index 60% rename from inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditImp.java rename to inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java index 2ef56cac9..ddbafd916 100644 --- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditImp.java +++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java @@ -36,47 +36,61 @@ import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantLock; -import static org.apache.inlong.audit.protocol.AuditApi.BaseCommand.Type.AUDITREQUEST; +import static org.apache.inlong.audit.protocol.AuditApi.BaseCommand.Type.AUDIT_REQUEST; -public class AuditImp { - private static final Logger logger = LoggerFactory.getLogger(AuditImp.class); - private static AuditImp auditImp = new AuditImp(); +/** + * Audit operator, which is singleton. + */ +public class AuditOperator { + + private static final Logger LOGGER = LoggerFactory.getLogger(AuditOperator.class); private static final String FIELD_SEPARATORS = ":"; - private ConcurrentHashMap<String, StatInfo> countMap = new ConcurrentHashMap<String, StatInfo>(); - private HashMap<String, StatInfo> threadSumMap = new HashMap<String, StatInfo>(); - private ConcurrentHashMap<String, StatInfo> deleteCountMap = new ConcurrentHashMap<String, StatInfo>(); - private List<String> deleteKeyList = new ArrayList<String>(); - private AuditConfig auditConfig = null; - private Config config = new Config(); - private Long sdkTime; + private static final int BATCH_NUM = 100; + private static final AuditOperator AUDIT_OPERATOR = new AuditOperator(); + private static final ReentrantLock GLOBAL_LOCK = new ReentrantLock(); + private static final int PERIOD = 1000 * 60; + private final ConcurrentHashMap<String, StatInfo> countMap = new ConcurrentHashMap<>(); + private final HashMap<String, StatInfo> threadCountMap = new HashMap<>(); + private final ConcurrentHashMap<String, StatInfo> deleteCountMap = new ConcurrentHashMap<>(); + private final List<String> deleteKeyList = new ArrayList<>(); + private final Config config = new Config(); + private final Timer timer = new Timer(); private int packageId = 1; private int dataId = 0; - private static final int BATCH_NUM = 100; - boolean inited = false; + private boolean initialized = false; private SenderManager manager; - private static ReentrantLock globalLock = new ReentrantLock(); - private static int PERIOD = 1000 * 60; - private Timer timer = new Timer(); - private TimerTask timerTask = new TimerTask() { + + private final TimerTask timerTask = new TimerTask() { @Override public void run() { try { - sendReport(); + send(); } catch (Exception e) { - logger.error(e.getMessage()); + LOGGER.error(e.getMessage()); } } }; + private AuditConfig auditConfig = null; + + /** + * Not support create from outer. + */ + private AuditOperator() { - public static AuditImp getInstance() { - return auditImp; + } + + /** + * Get AuditOperator instance. + */ + public static AuditOperator getInstance() { + return AUDIT_OPERATOR; } /** * init */ private void init() { - if (inited) { + if (initialized) { return; } config.init(); @@ -88,29 +102,25 @@ public class AuditImp { } /** - * setAuditProxy - * - * @param ipPortList + * Set AuditProxy from the ip */ public void setAuditProxy(HashSet<String> ipPortList) { try { - globalLock.lockInterruptibly(); - if (!inited) { + GLOBAL_LOCK.lockInterruptibly(); + if (!initialized) { init(); - inited = true; + initialized = true; } this.manager.setAuditProxy(ipPortList); } catch (InterruptedException e) { - logger.error(e.getMessage()); + LOGGER.error(e.getMessage()); } finally { - globalLock.unlock(); + GLOBAL_LOCK.unlock(); } } /** * set audit config - * - * @param config */ public void setAuditConfig(AuditConfig config) { auditConfig = config; @@ -118,14 +128,7 @@ public class AuditImp { } /** - * api - * - * @param auditID - * @param inlongGroupID - * @param inlongStreamID - * @param logTime - * @param count - * @param size + * Add audit data */ public void add(int auditID, String inlongGroupID, String inlongStreamID, Long logTime, long count, long size) { long delayTime = System.currentTimeMillis() - logTime; @@ -135,30 +138,21 @@ public class AuditImp { } /** - * add by key - * - * @param key - * @param count - * @param size - * @param delayTime + * Add audit info by key. */ private void addByKey(String key, long count, long size, long delayTime) { - try { - if (countMap.get(key) == null) { - countMap.put(key, new StatInfo(0L, 0L, 0L)); - } - countMap.get(key).count.addAndGet(count); - countMap.get(key).size.addAndGet(size); - countMap.get(key).delay.addAndGet(delayTime * count); - } catch (Exception e) { - return; + if (countMap.get(key) == null) { + countMap.put(key, new StatInfo(0L, 0L, 0L)); } + countMap.get(key).count.addAndGet(count); + countMap.get(key).size.addAndGet(size); + countMap.get(key).delay.addAndGet(delayTime * count); } /** - * Report audit data + * Send audit data */ - public synchronized void sendReport() { + public synchronized void send() { manager.clearBuffer(); resetStat(); // Retrieve statistics from the list of objects without statistics to be eliminated @@ -183,76 +177,78 @@ public class AuditImp { this.deleteCountMap.put(key, value); } this.deleteKeyList.clear(); - sdkTime = Calendar.getInstance().getTimeInMillis(); - AuditApi.AuditMessageHeader mssageHeader = AuditApi.AuditMessageHeader.newBuilder() + + long sdkTime = Calendar.getInstance().getTimeInMillis(); + AuditApi.AuditMessageHeader msgHeader = AuditApi.AuditMessageHeader.newBuilder() .setIp(config.getLocalIP()).setDockerId(config.getDockerId()) .setThreadId(String.valueOf(Thread.currentThread().getId())) .setSdkTs(sdkTime).setPacketId(packageId) .build(); - AuditApi.AuditRequest.Builder requestBulid = AuditApi.AuditRequest.newBuilder(); - requestBulid.setMsgHeader(mssageHeader).setRequestId(manager.nextRequestId()); - for (Map.Entry<String, StatInfo> entry : threadSumMap.entrySet()) { + AuditApi.AuditRequest.Builder requestBuild = AuditApi.AuditRequest.newBuilder(); + requestBuild.setMsgHeader(msgHeader).setRequestId(manager.nextRequestId()); + + // process the stat info for all threads + for (Map.Entry<String, StatInfo> entry : threadCountMap.entrySet()) { String[] keyArray = entry.getKey().split(FIELD_SEPARATORS); long logTime = Long.parseLong(keyArray[0]) * PERIOD; String inlongGroupID = keyArray[1]; String inlongStreamID = keyArray[2]; String auditID = keyArray[3]; StatInfo value = entry.getValue(); - AuditApi.AuditMessageBody mssageBody = AuditApi.AuditMessageBody.newBuilder() - .setLogTs(logTime).setInlongGroupId(inlongGroupID) - .setInlongStreamId(inlongStreamID).setAuditId(auditID) - .setCount(value.count.get()).setSize(value.size.get()) + AuditApi.AuditMessageBody msgBody = AuditApi.AuditMessageBody.newBuilder() + .setLogTs(logTime) + .setInlongGroupId(inlongGroupID) + .setInlongStreamId(inlongStreamID) + .setAuditId(auditID) + .setCount(value.count.get()) + .setSize(value.size.get()) .setDelay(value.delay.get()) .build(); - requestBulid.addMsgBody(mssageBody); + requestBuild.addMsgBody(msgBody); + if (dataId++ >= BATCH_NUM) { dataId = 0; packageId++; - sendByBaseCommand(sdkTime, requestBulid.build()); - requestBulid.clearMsgBody(); + sendByBaseCommand(requestBuild.build()); + requestBuild.clearMsgBody(); } } - if (requestBulid.getMsgBodyCount() > 0) { - sendByBaseCommand(sdkTime, requestBulid.build()); - requestBulid.clearMsgBody(); + if (requestBuild.getMsgBodyCount() > 0) { + sendByBaseCommand(requestBuild.build()); + requestBuild.clearMsgBody(); } - threadSumMap.clear(); - logger.info("finish send report."); + threadCountMap.clear(); + + LOGGER.info("finish report audit data"); } /** - * send base command - * - * @param sdkTime - * @param auditRequest + * Send base command */ - private void sendByBaseCommand(long sdkTime, AuditApi.AuditRequest auditRequest) { + private void sendByBaseCommand(AuditApi.AuditRequest auditRequest) { AuditApi.BaseCommand.Builder baseCommand = AuditApi.BaseCommand.newBuilder(); - baseCommand.setType(AUDITREQUEST).setAuditRequest(auditRequest).build(); - manager.send(sdkTime, baseCommand.build()); + baseCommand.setType(AUDIT_REQUEST).setAuditRequest(auditRequest).build(); + manager.send(baseCommand.build()); } /** * Summary - * - * @param key - * @param statInfo */ private void sumThreadGroup(String key, StatInfo statInfo) { long count = statInfo.count.getAndSet(0); if (0 == count) { return; } - if (threadSumMap.get(key) == null) { - threadSumMap.put(key, new StatInfo(0, 0, 0)); + if (threadCountMap.get(key) == null) { + threadCountMap.put(key, new StatInfo(0, 0, 0)); } long size = statInfo.size.getAndSet(0); long delay = statInfo.delay.getAndSet(0); - threadSumMap.get(key).count.addAndGet(count); - threadSumMap.get(key).size.addAndGet(size); - threadSumMap.get(key).delay.addAndGet(delay); + threadCountMap.get(key).count.addAndGet(count); + threadCountMap.get(key).size.addAndGet(size); + threadCountMap.get(key).delay.addAndGet(delay); } /** diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderHandler.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderHandler.java index dd8ff1a3a..e2f8fc5b9 100644 --- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderHandler.java +++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderHandler.java @@ -23,13 +23,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SenderHandler extends SimpleChannelInboundHandler<byte[]> { - private static final Logger logger = LoggerFactory.getLogger(SenderHandler.class); - private SenderManager manager; + + private static final Logger LOGGER = LoggerFactory.getLogger(SenderHandler.class); + private final SenderManager manager; /** * Constructor - * - * @param manager */ public SenderHandler(SenderManager manager) { this.manager = manager; @@ -39,11 +38,11 @@ public class SenderHandler extends SimpleChannelInboundHandler<byte[]> { * Message Received */ @Override - public void channelRead0(io.netty.channel.ChannelHandlerContext ctx, byte[] e) { + public void channelRead0(io.netty.channel.ChannelHandlerContext ctx, byte[] e) { try { manager.onMessageReceived(ctx, e); } catch (Throwable ex) { - logger.error(ex.getMessage()); + LOGGER.error("channelRead0 error: ", ex); } } @@ -55,7 +54,7 @@ public class SenderHandler extends SimpleChannelInboundHandler<byte[]> { try { manager.onExceptionCaught(ctx, e); } catch (Throwable ex) { - logger.error(ex.getMessage()); + LOGGER.error("caught exception: ", ex); } } @@ -63,11 +62,11 @@ public class SenderHandler extends SimpleChannelInboundHandler<byte[]> { * Disconnected channel */ @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { + public void channelInactive(ChannelHandlerContext ctx) { try { super.channelInactive(ctx); } catch (Throwable ex) { - logger.error(ex.getMessage()); + LOGGER.error("channelInactive error: ", ex); } } @@ -79,7 +78,7 @@ public class SenderHandler extends SimpleChannelInboundHandler<byte[]> { try { super.channelUnregistered(ctx); } catch (Throwable ex) { - logger.error(ex.getMessage()); + LOGGER.error("channelUnregistered error: ", ex); } } } diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java index b841014d5..9d317a238 100644 --- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java +++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java @@ -42,28 +42,27 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; /** - * sender manager + * Audit sender manager */ public class SenderManager { - private static final Logger logger = LoggerFactory.getLogger(SenderManager.class); + public static final Long MAX_REQUEST_ID = 1000000000L; - private static final int SEND_INTERVAL_MS = 20; public static final int ALL_CONNECT_CHANNEL = -1; public static final int DEFAULT_CONNECT_CHANNEL = 2; - public static final String LF = "\n"; + private static final Logger logger = LoggerFactory.getLogger(SenderManager.class); + private static final int SEND_INTERVAL_MS = 20; + private final SecureRandom sRandom = new SecureRandom(Long.toString(System.currentTimeMillis()).getBytes()); + private final AtomicLong requestIdSeq = new AtomicLong(0L); + private final ConcurrentHashMap<Long, AuditData> dataMap = new ConcurrentHashMap<>(); + private SenderGroup sender; private int maxConnectChannels = ALL_CONNECT_CHANNEL; - private SecureRandom sRandom = new SecureRandom(Long.toString(System.currentTimeMillis()).getBytes()); // IPList - private HashSet<String> currentIpPorts = new HashSet<String>(); - private AtomicLong requestIdSeq = new AtomicLong(0L); - private ConcurrentHashMap<Long, AuditData> dataMap = new ConcurrentHashMap<>(); + private HashSet<String> currentIpPorts = new HashSet<>(); private AuditConfig auditConfig; /** * Constructor - * - * @param config */ public SenderManager(AuditConfig config) { this(config, DEFAULT_CONNECT_CHANNEL); @@ -71,9 +70,6 @@ public class SenderManager { /** * Constructor - * - * @param config - * @param maxConnectChannels */ public SenderManager(AuditConfig config, int maxConnectChannels) { try { @@ -95,15 +91,15 @@ public class SenderManager { this.sender.setHasSendError(false); this.currentIpPorts = ipPortList; int ipSize = ipPortList.size(); - int needNewSize = 0; + int needNewSize; if (this.maxConnectChannels == ALL_CONNECT_CHANNEL || this.maxConnectChannels >= ipSize) { needNewSize = ipSize; } else { needNewSize = maxConnectChannels; } + HashSet<String> updateConfigIpLists = new HashSet<>(); - List<String> availableIpLists = new ArrayList<String>(); - availableIpLists.addAll(ipPortList); + List<String> availableIpLists = new ArrayList<>(ipPortList); for (int i = 0; i < needNewSize; i++) { int availableIpSize = availableIpLists.size(); int newIpPortIndex = this.sRandom.nextInt(availableIpSize); @@ -116,12 +112,10 @@ public class SenderManager { } /** - * next requestid - * - * @return + * next request id */ public Long nextRequestId() { - Long requestId = requestIdSeq.getAndIncrement(); + long requestId = requestIdSeq.getAndIncrement(); if (requestId > MAX_REQUEST_ID) { requestId = 0L; requestIdSeq.set(requestId); @@ -130,22 +124,17 @@ public class SenderManager { } /** - * send data - * - * @param sdkTime - * @param baseCommand + * Send data with command */ - public void send(long sdkTime, AuditApi.BaseCommand baseCommand) { - AuditData data = new AuditData(sdkTime, baseCommand); - // Cache first + public void send(AuditApi.BaseCommand baseCommand) { + AuditData data = new AuditData(baseCommand); + // cache first this.dataMap.putIfAbsent(baseCommand.getAuditRequest().getRequestId(), data); this.sendData(data.getDataByte()); } /** - * send data - * - * @param data + * Send data byte array */ private void sendData(byte[] data) { if (data == null || data.length <= 0) { @@ -167,7 +156,7 @@ public class SenderManager { logger.info("audit failed cache size: {}", this.dataMap.size()); for (AuditData data : this.dataMap.values()) { this.sendData(data.getDataByte()); - sleep(SEND_INTERVAL_MS); + this.sleep(); } if (this.dataMap.size() == 0) { checkAuditFile(); @@ -190,10 +179,10 @@ public class SenderManager { File file = new File(auditConfig.getDisasterFile()); if (!file.exists()) { if (!file.createNewFile()) { - logger.error("create {} {}", auditConfig.getDisasterFile(), " failed"); + logger.error("create file {} failed", auditConfig.getDisasterFile()); return; } - logger.info("create {}", auditConfig.getDisasterFile()); + logger.info("create file {} success", auditConfig.getDisasterFile()); } if (file.length() > auditConfig.getMaxFileSize()) { file.delete(); @@ -204,15 +193,13 @@ public class SenderManager { objectOutputStream.writeObject(dataMap); objectOutputStream.close(); fos.close(); - } catch (IOException ioException) { - logger.error(ioException.getMessage()); + } catch (IOException e) { + logger.error("write local file error: ", e); } } /** * check file path - * - * @return */ private boolean checkFilePath() { File file = new File(auditConfig.getFilePath()); @@ -220,7 +207,7 @@ public class SenderManager { if (!file.mkdirs()) { return false; } - logger.info("create {}", auditConfig.getFilePath()); + logger.info("create file {} success", auditConfig.getFilePath()); } return true; } @@ -235,26 +222,26 @@ public class SenderManager { return; } FileInputStream inputStream = new FileInputStream(auditConfig.getDisasterFile()); - ObjectInputStream objectInputStream = new ObjectInputStream(inputStream); + ObjectInputStream objectStream = new ObjectInputStream(inputStream); ConcurrentHashMap<Long, AuditData> fileData = - (ConcurrentHashMap<Long, AuditData>) objectInputStream.readObject(); + (ConcurrentHashMap<Long, AuditData>) objectStream.readObject(); for (Map.Entry<Long, AuditData> entry : fileData.entrySet()) { if (this.dataMap.size() < (auditConfig.getMaxCacheRow() / 2)) { this.dataMap.putIfAbsent(entry.getKey(), entry.getValue()); } this.sendData(entry.getValue().getDataByte()); - sleep(SEND_INTERVAL_MS); + this.sleep(); } - objectInputStream.close(); + objectStream.close(); inputStream.close(); file.delete(); - } catch (IOException | ClassNotFoundException ioException) { - logger.error(ioException.getMessage()); + } catch (IOException | ClassNotFoundException e) { + logger.error("check audit file error: ", e); } } /** - * get data map szie + * get data map size */ public int getDataMapSize() { return this.dataMap.size(); @@ -262,68 +249,60 @@ public class SenderManager { /** * processing return package - * - * @param ctx ctx - * @param msg msg */ public void onMessageReceived(ChannelHandlerContext ctx, byte[] msg) { try { - //Analyze abnormal events - byte[] readBytes = msg; - AuditApi.BaseCommand baseCommand = AuditApi.BaseCommand.parseFrom(readBytes); + // Analyze abnormal events + AuditApi.BaseCommand baseCommand = AuditApi.BaseCommand.parseFrom(msg); // Parse request id Long requestId = baseCommand.getAuditReply().getRequestId(); AuditData data = this.dataMap.get(requestId); if (data == null) { - logger.error("can not find the requestid onMessageReceived:" + requestId); + logger.error("can not find the request id onMessageReceived: " + requestId); return; } - logger.info("audit-proxy response code: {}", baseCommand.getAuditReply().getRspCode().toString()); + + logger.info("audit-proxy response code: {}", baseCommand.getAuditReply().getRspCode()); if (AuditApi.AuditReply.RSP_CODE.SUCCESS.equals(baseCommand.getAuditReply().getRspCode())) { this.dataMap.remove(requestId); return; } + int resendTimes = data.increaseResendTimes(); - if (resendTimes < org.apache.inlong.audit.send.SenderGroup.MAX_SEND_TIMES) { + if (resendTimes < SenderGroup.MAX_SEND_TIMES) { this.sendData(data.getDataByte()); } } catch (Throwable ex) { - logger.error(ex.getMessage()); + logger.error("onMessageReceived exception: ", ex); this.sender.setHasSendError(true); } } /** * Handle the packet return exception - * - * @param ctx - * @param e */ public void onExceptionCaught(ChannelHandlerContext ctx, Throwable e) { - logger.error(e.getCause().getMessage()); + logger.error("channel context " + ctx + " occurred exception: ", e); try { this.sender.setHasSendError(true); } catch (Throwable ex) { - logger.error(ex.getMessage()); + logger.error("setHasSendError error: ", ex); } } /** - * sleep - * - * @param millisecond + * sleep SEND_INTERVAL_MS */ - private void sleep(int millisecond) { + private void sleep() { try { - Thread.sleep(millisecond); - } catch (Throwable e) { - logger.error(e.getMessage()); + Thread.sleep(SEND_INTERVAL_MS); + } catch (Throwable ex) { + logger.error("sleep error: ", ex); } } /*** * set audit config - * @param config */ public void setAuditConfig(AuditConfig config) { auditConfig = config; diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditData.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditData.java index 4dde7e118..a7064b996 100644 --- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditData.java +++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditData.java @@ -24,33 +24,27 @@ import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicInteger; public class AuditData implements Serializable { + public static int HEAD_LENGTH = 4; - private final long sdkTime; private final AuditApi.BaseCommand content; - private AtomicInteger resendTimes = new AtomicInteger(0); + private final AtomicInteger resendTimes = new AtomicInteger(0); /** * Constructor - * - * @param sdkTime - * @param content */ - public AuditData(long sdkTime, AuditApi.BaseCommand content) { - this.sdkTime = sdkTime; + public AuditData(AuditApi.BaseCommand content) { this.content = content; } /** - * set resendTimes + * Increase and get resend times */ public int increaseResendTimes() { return this.resendTimes.incrementAndGet(); } /** - * getDataByte - * - * @return + * Get data byte array */ public byte[] getDataByte() { return addBytes(ByteBuffer.allocate(HEAD_LENGTH).putInt(content.toByteArray().length).array(), @@ -60,9 +54,7 @@ public class AuditData implements Serializable { /** * Concatenated byte array * - * @param data1 - * @param data2 - * @return data1 and data2 combined package result + * @return data1 and data2 combined package result */ public byte[] addBytes(byte[] data1, byte[] data2) { byte[] data3 = new byte[data1.length + data2.length]; diff --git a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/AuditDataTest.java b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/AuditDataTest.java index 82babece5..4de0413ff 100644 --- a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/AuditDataTest.java +++ b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/AuditDataTest.java @@ -20,19 +20,20 @@ package org.apache.inlong.audit.util; import org.apache.inlong.audit.protocol.AuditApi; import org.junit.Test; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; public class AuditDataTest { + @Test public void increaseResendTimes() { - AuditApi.BaseCommand content = null; - AuditData test = new AuditData(System.currentTimeMillis(), content); + AuditData test = new AuditData(null); int resendTimes = test.increaseResendTimes(); - assertTrue(resendTimes == 1); + assertEquals(1, resendTimes); resendTimes = test.increaseResendTimes(); - assertTrue(resendTimes == 2); + assertEquals(2, resendTimes); resendTimes = test.increaseResendTimes(); - assertTrue(resendTimes == 3); + assertEquals(3, resendTimes); } @Test @@ -54,12 +55,9 @@ public class AuditDataTest { AuditApi.AuditRequest request = AuditApi.AuditRequest.newBuilder().setMsgHeader(headerBuilder.build()) .addMsgBody(bodyBuilder.build()).build(); AuditApi.BaseCommand baseCommand = AuditApi.BaseCommand.newBuilder().setAuditRequest(request).build(); - AuditData test = new AuditData(System.currentTimeMillis(), baseCommand); + AuditData test = new AuditData(baseCommand); byte[] data = test.getDataByte(); assertTrue(data.length > 0); } - @Test - public void addBytes() { - } } \ No newline at end of file diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java index 24c251626..331620af9 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java @@ -1,10 +1,10 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with + * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -17,13 +17,11 @@ package org.apache.inlong.dataproxy.metrics.audit; -import java.util.HashSet; -import java.util.Map; import org.apache.commons.lang3.BooleanUtils; -import org.apache.commons.lang3.math.NumberUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.math.NumberUtils; import org.apache.flume.Event; -import org.apache.inlong.audit.AuditImp; +import org.apache.inlong.audit.AuditOperator; import org.apache.inlong.audit.util.AuditConfig; import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder; import org.apache.inlong.dataproxy.consts.AttributeConstants; @@ -32,9 +30,12 @@ import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem; import org.apache.inlong.dataproxy.utils.Constants; import org.apache.inlong.dataproxy.utils.InLongMsgVer; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; + /** - * - * AuditUtils + * Audit utils */ public class AuditUtils { @@ -51,7 +52,7 @@ public class AuditUtils { private static boolean IS_AUDIT = true; /** - * initAudit + * Init audit */ public static void initAudit() { // IS_AUDIT @@ -62,26 +63,21 @@ public class AuditUtils { HashSet<String> proxys = new HashSet<>(); if (!StringUtils.isBlank(strIpPorts)) { String[] ipPorts = strIpPorts.split("\\s+"); - for (String ipPort : ipPorts) { - proxys.add(ipPort); - } + Collections.addAll(proxys, ipPorts); } - AuditImp.getInstance().setAuditProxy(proxys); + AuditOperator.getInstance().setAuditProxy(proxys); // AuditConfig String filePath = CommonPropertiesHolder.getString(AUDIT_KEY_FILE_PATH, AUDIT_DEFAULT_FILE_PATH); int maxCacheRow = NumberUtils.toInt( CommonPropertiesHolder.getString(AUDIT_KEY_MAX_CACHE_ROWS), AUDIT_DEFAULT_MAX_CACHE_ROWS); AuditConfig auditConfig = new AuditConfig(filePath, maxCacheRow); - AuditImp.getInstance().setAuditConfig(auditConfig); + AuditOperator.getInstance().setAuditConfig(auditConfig); } } /** - * add - * - * @param auditID - * @param event + * Add audit data */ public static void add(int auditID, Event event) { if (!IS_AUDIT || event == null) { @@ -97,23 +93,20 @@ public class AuditUtils { if (event.getHeaders().containsKey(ConfigConstants.MSG_COUNTER_KEY)) { msgCount = Long.parseLong(event.getHeaders().get(ConfigConstants.MSG_COUNTER_KEY)); } - AuditImp.getInstance().add(auditID, inlongGroupId, + AuditOperator.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, msgCount, event.getBody().length); } else { String groupId = headers.get(AttributeConstants.GROUP_ID); String streamId = headers.get(AttributeConstants.STREAM_ID); long dataTime = NumberUtils.toLong(headers.get(AttributeConstants.DATA_TIME)); long msgCount = NumberUtils.toLong(headers.get(ConfigConstants.MSG_COUNTER_KEY)); - AuditImp.getInstance().add(auditID, groupId, + AuditOperator.getInstance().add(auditID, groupId, streamId, dataTime, msgCount, event.getBody().length); } } /** - * getLogTime - * - * @param headers - * @return + * Get LogTime from headers */ public static long getLogTime(Map<String, String> headers) { String strLogTime = headers.get(Constants.HEADER_KEY_MSG_TIME); @@ -131,10 +124,7 @@ public class AuditUtils { } /** - * getLogTime - * - * @param event - * @return + * Get LogTime from event */ public static long getLogTime(Event event) { if (event != null) { @@ -145,20 +135,16 @@ public class AuditUtils { } /** - * getAuditFormatTime - * - * @param msgTime - * @return + * Get AuditFormatTime */ public static long getAuditFormatTime(long msgTime) { - long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval(); - return auditFormatTime; + return msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval(); } /** - * sendReport + * Send audit data */ - public static void sendReport() { - AuditImp.getInstance().sendReport(); + public static void send() { + AuditOperator.getInstance().send(); } } diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/Application.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/Application.java index 836ba0ce2..881e2b0b2 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/Application.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/Application.java @@ -1,10 +1,10 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with + * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -66,35 +66,30 @@ import java.util.Set; import java.util.concurrent.locks.ReentrantLock; /** - * Application + * DataProxy application */ public class Application { - private static final Logger logger = LoggerFactory - .getLogger(Application.class); - - public static final String CONF_MONITOR_CLASS = "flume.monitoring.type"; - public static final String CONF_MONITOR_PREFIX = "flume.monitoring."; + private static final Logger LOGGER = LoggerFactory.getLogger(Application.class); + private static final String CONF_MONITOR_CLASS = "flume.monitoring.type"; + private static final String CONF_MONITOR_PREFIX = "flume.monitoring."; private final List<LifecycleAware> components; private final LifecycleSupervisor supervisor; + private final ReentrantLock lifecycleLock = new ReentrantLock(); private MaterializedConfiguration materializedConfiguration; private MonitorService monitorServer; - private final ReentrantLock lifecycleLock = new ReentrantLock(); private AdminTask adminTask; - private HeartbeatManager heartbeatManager; /** * Constructor */ public Application() { - this(new ArrayList<LifecycleAware>(0)); + this(new ArrayList<>(0)); } /** * Constructor - * - * @param components */ public Application(List<LifecycleAware> components) { this.components = components; @@ -102,7 +97,161 @@ public class Application { } /** - * start + * Main entrance + */ + public static void main(String[] args) { + try { + SSLUtil.initGlobalSSLParameters(); + Options options = new Options(); + + Option option = new Option("n", "name", true, "the name of this agent"); + option.setRequired(true); + options.addOption(option); + + option = new Option("f", "conf-file", true, + "specify a config file (required if -z missing)"); + option.setRequired(false); + options.addOption(option); + + option = new Option(null, "no-reload-conf", false, + "do not reload config file if changed"); + options.addOption(option); + + // Options for Zookeeper + option = new Option("z", "zkConnString", true, + "specify the ZooKeeper connection to use (required if -f missing)"); + option.setRequired(false); + options.addOption(option); + + option = new Option("p", "zkBasePath", true, + "specify the base path in ZooKeeper for agent configs"); + option.setRequired(false); + options.addOption(option); + + option = new Option("h", "help", false, "display help text"); + options.addOption(option); + + // load configuration data from manager + option = new Option(null, "load-conf-from-manager", false, + "load configuration data from manager"); + option.setRequired(false); + options.addOption(option); + + CommandLineParser parser = new GnuParser(); + CommandLine commandLine = parser.parse(options, args); + + if (commandLine.hasOption('h')) { + new HelpFormatter().printHelp("flume-ng agent", options, true); + return; + } + + // start by manager configuration + if (commandLine.hasOption("load-conf-from-manager")) { + startByManagerConf(commandLine); + return; + } + + String agentName = commandLine.getOptionValue('n'); + boolean reload = !commandLine.hasOption("no-reload-conf"); + + boolean isZkConfigured = commandLine.hasOption('z') || commandLine.hasOption("zkConnString"); + + Application application; + if (isZkConfigured) { + // get options + String zkConnectionStr = commandLine.getOptionValue('z'); + String baseZkPath = commandLine.getOptionValue('p'); + + if (reload) { + EventBus eventBus = new EventBus(agentName + "-event-bus"); + List<LifecycleAware> components = Lists.newArrayList(); + PollingZooKeeperConfigurationProvider zProvider = new PollingZooKeeperConfigurationProvider( + agentName, zkConnectionStr, baseZkPath, eventBus); + components.add(zProvider); + application = new Application(components); + eventBus.register(application); + } else { + StaticZooKeeperConfigurationProvider zProvider = new StaticZooKeeperConfigurationProvider( + agentName, zkConnectionStr, baseZkPath); + application = new Application(); + application.handleConfigurationEvent(zProvider.getConfiguration()); + } + } else { + File configurationFile = new File(commandLine.getOptionValue('f')); + // The following is to ensure that by default the agent will fail on startup + // if the file does not exist. + if (!configurationFile.exists()) { + // If command line invocation, then need to fail fast + if (System.getProperty(Constants.SYSPROP_CALLED_FROM_SERVICE) == null) { + String path = configurationFile.getPath(); + try { + path = configurationFile.getCanonicalPath(); + } catch (IOException ex) { + LOGGER.error("failed to read canonical path for file: " + path, ex); + } + throw new ParseException("configuration file does not exist: " + path); + } + } + + List<LifecycleAware> components = Lists.newArrayList(); + if (reload) { + EventBus eventBus = new EventBus(agentName + "-event-bus"); + PollingPropertiesFileConfigurationProvider configurationProvider; + configurationProvider = new PollingPropertiesFileConfigurationProvider( + agentName, configurationFile, eventBus, 30); + components.add(configurationProvider); + application = new Application(components); + eventBus.register(application); + } else { + PropertiesFileConfigurationProvider configurationProvider; + configurationProvider = new PropertiesFileConfigurationProvider(agentName, configurationFile); + application = new Application(); + application.handleConfigurationEvent(configurationProvider.getConfiguration()); + } + } + // metrics + MetricObserver.init(CommonPropertiesHolder.get()); + // audit + AuditUtils.initAudit(); + + final Application appReference = application; + Runtime.getRuntime().addShutdownHook(new Thread("data-proxy-shutdown-hook") { + @Override + public void run() { + AuditUtils.send(); + appReference.stop(); + } + }); + + // start application + application.start(); + Thread.sleep(5000); + } catch (Exception e) { + LOGGER.error("fatal error occurred while running data-proxy: ", e); + } + } + + /** + * Start by Manager config + */ + private static void startByManagerConf(CommandLine commandLine) { + String proxyName = CommonPropertiesHolder.getString(RemoteConfigManager.KEY_PROXY_CLUSTER_NAME); + ManagerPropsConfigProvider configurationProvider = new ManagerPropsConfigProvider(proxyName); + Application application = new Application(); + application.handleConfigurationEvent(configurationProvider.getConfiguration()); + application.start(); + + final Application appReference = application; + Runtime.getRuntime().addShutdownHook(new Thread("data-proxy-shutdown-hook") { + @Override + public void run() { + appReference.stop(); + } + }); + } + + /** + * Start all components */ public void start() { lifecycleLock.lock(); @@ -110,27 +259,23 @@ public class Application { for (LifecycleAware component : components) { // update dataproxy config if (component instanceof IDataProxyConfigHolder) { - ((IDataProxyConfigHolder) component) - .setDataProxyConfig( - RemoteConfigManager.getInstance().getCurrentClusterConfigRef()); + ((IDataProxyConfigHolder) component).setDataProxyConfig( + RemoteConfigManager.getInstance().getCurrentClusterConfigRef()); } - supervisor.supervise(component, - new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); + supervisor.supervise(component, new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } // start admin task this.adminTask = new AdminTask(new Context(CommonPropertiesHolder.get())); this.adminTask.start(); - this.heartbeatManager = new HeartbeatManager(); - this.heartbeatManager.start(); + HeartbeatManager heartbeatManager = new HeartbeatManager(); + heartbeatManager.start(); } finally { lifecycleLock.unlock(); } } /** - * handleConfigurationEvent - * - * @param conf + * Handle the configuration event */ @Subscribe public void handleConfigurationEvent(MaterializedConfiguration conf) { @@ -139,8 +284,7 @@ public class Application { stopAllComponents(); startAllComponents(conf); } catch (InterruptedException e) { - logger.info("Interrupted while trying to handle configuration event"); - return; + LOGGER.info("interrupted while handle the configuration event"); } finally { // If interrupted while trying to lock, we don't own the lock, so must not attempt to unlock if (lifecycleLock.isHeldByCurrentThread()) { @@ -150,7 +294,7 @@ public class Application { } /** - * stop + * Stop the application */ public void stop() { lifecycleLock.lock(); @@ -170,102 +314,93 @@ public class Application { } /** - * stopAllComponents + * Stop all components */ private void stopAllComponents() { - if (this.materializedConfiguration != null) { - logger.info("Shutting down configuration: {}", this.materializedConfiguration); - for (Entry<String, SourceRunner> entry : this.materializedConfiguration - .getSourceRunners().entrySet()) { + LOGGER.info("shutting down configuration: {}", materializedConfiguration); + if (materializedConfiguration != null) { + for (Entry<String, SourceRunner> entry : materializedConfiguration.getSourceRunners().entrySet()) { try { - logger.info("Stopping Source " + entry.getKey()); + LOGGER.info("stopping source " + entry.getKey()); supervisor.unsupervise(entry.getValue()); } catch (Exception e) { - logger.error("Error while stopping {}", entry.getValue(), e); + LOGGER.error("error while stopping source " + entry.getValue(), e); } } - for (Entry<String, SinkRunner> entry : this.materializedConfiguration.getSinkRunners() - .entrySet()) { + for (Entry<String, SinkRunner> entry : materializedConfiguration.getSinkRunners().entrySet()) { try { - logger.info("Stopping Sink " + entry.getKey()); + LOGGER.info("stopping sink " + entry.getKey()); supervisor.unsupervise(entry.getValue()); } catch (Exception e) { - logger.error("Error while stopping {}", entry.getValue(), e); + LOGGER.error("error while stopping sink " + entry.getValue(), e); } } - for (Entry<String, Channel> entry : this.materializedConfiguration.getChannels() - .entrySet()) { + for (Entry<String, Channel> entry : materializedConfiguration.getChannels().entrySet()) { try { - logger.info("Stopping Channel " + entry.getKey()); + LOGGER.info("stopping channel " + entry.getKey()); supervisor.unsupervise(entry.getValue()); } catch (Exception e) { - logger.error("Error while stopping {}", entry.getValue(), e); + LOGGER.error("error while stopping channel " + entry.getValue(), e); } } } + + LOGGER.info("shutting down monitor server: {}", monitorServer); if (monitorServer != null) { monitorServer.stop(); } } /** - * startAllComponents - * - * @param materializedConfiguration + * Start all components */ private void startAllComponents(MaterializedConfiguration materializedConfiguration) { - logger.info("Starting new configuration:{}", materializedConfiguration); + LOGGER.info("Starting new configuration:{}", materializedConfiguration); this.materializedConfiguration = materializedConfiguration; - for (Entry<String, Channel> entry : materializedConfiguration.getChannels().entrySet()) { try { - logger.info("Starting Channel " + entry.getKey()); - supervisor.supervise(entry.getValue(), - new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); + LOGGER.info("starting channel " + entry.getKey()); + supervisor.supervise(entry.getValue(), new SupervisorPolicy.AlwaysRestartPolicy(), + LifecycleState.START); } catch (Exception e) { - logger.error("Error while starting {}", entry.getValue(), e); + LOGGER.error("error while starting channel " + entry.getValue(), e); } } - /* - * Wait for all channels to start. - */ + // Wait for all channels to start. for (Channel ch : materializedConfiguration.getChannels().values()) { while (ch.getLifecycleState() != LifecycleState.START && !supervisor.isComponentInErrorState(ch)) { try { - logger.info("Waiting for channel: " + ch.getName() - + " to start. Sleeping for 500 ms"); + LOGGER.info("sleeping for 500 ms to wait for channel: {} to start", ch.getName()); Thread.sleep(500); } catch (InterruptedException e) { - logger.error("Interrupted while waiting for channel to start.", e); + LOGGER.error("interrupted while waiting for channel to start: ", e); Throwables.propagate(e); } } } - for (Entry<String, SinkRunner> entry : materializedConfiguration.getSinkRunners() - .entrySet()) { + for (Entry<String, SinkRunner> entry : materializedConfiguration.getSinkRunners().entrySet()) { try { - logger.info("Starting Sink " + entry.getKey()); - supervisor.supervise(entry.getValue(), - new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); + LOGGER.info("starting sink " + entry.getKey()); + supervisor.supervise(entry.getValue(), new SupervisorPolicy.AlwaysRestartPolicy(), + LifecycleState.START); } catch (Exception e) { - logger.error("Error while starting {}", entry.getValue(), e); + LOGGER.error("error while starting sink: " + entry.getValue(), e); } } - for (Entry<String, SourceRunner> entry : materializedConfiguration.getSourceRunners() - .entrySet()) { + for (Entry<String, SourceRunner> entry : materializedConfiguration.getSourceRunners().entrySet()) { try { - logger.info("Starting Source " + entry.getKey()); - supervisor.supervise(entry.getValue(), - new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); + LOGGER.info("starting source " + entry.getKey()); + supervisor.supervise(entry.getValue(), new SupervisorPolicy.AlwaysRestartPolicy(), + LifecycleState.START); } catch (Exception e) { - logger.error("Error while starting {}", entry.getValue(), e); + LOGGER.error("error while starting source: " + entry.getValue(), e); } } @@ -273,7 +408,7 @@ public class Application { } /** - * loadMonitoring + * Load monitoring */ @SuppressWarnings("unchecked") private void loadMonitoring() { @@ -285,8 +420,7 @@ public class Application { Class<? extends MonitorService> klass; try { // Is it a known type? - klass = MonitoringType.valueOf( - monitorType.toUpperCase(Locale.ENGLISH)).getMonitorClass(); + klass = MonitoringType.valueOf(monitorType.toUpperCase(Locale.ENGLISH)).getMonitorClass(); } catch (Exception e) { // Not a known type, use FQCN klass = (Class<? extends MonitorService>) Class.forName(monitorType); @@ -295,187 +429,15 @@ public class Application { Context context = new Context(); for (String key : keys) { if (key.startsWith(CONF_MONITOR_PREFIX)) { - context.put(key.substring(CONF_MONITOR_PREFIX.length()), - systemProps.getProperty(key)); + context.put(key.substring(CONF_MONITOR_PREFIX.length()), systemProps.getProperty(key)); } } monitorServer.configure(context); monitorServer.start(); } } catch (Exception e) { - logger.warn("Error starting monitoring. " - + "Monitoring might not be available.", e); - } - - } - - /** - * main - * - * @param args - */ - public static void main(String[] args) { - - try { - SSLUtil.initGlobalSSLParameters(); - - Options options = new Options(); - - Option option = new Option("n", "name", true, "the name of this agent"); - option.setRequired(true); - options.addOption(option); - - option = new Option("f", "conf-file", true, - "specify a config file (required if -z missing)"); - option.setRequired(false); - options.addOption(option); - - option = new Option(null, "no-reload-conf", false, - "do not reload config file if changed"); - options.addOption(option); - - // Options for Zookeeper - option = new Option("z", "zkConnString", true, - "specify the ZooKeeper connection to use (required if -f missing)"); - option.setRequired(false); - options.addOption(option); - - option = new Option("p", "zkBasePath", true, - "specify the base path in ZooKeeper for agent configs"); - option.setRequired(false); - options.addOption(option); - - option = new Option("h", "help", false, "display help text"); - options.addOption(option); - - // load configuration data from manager - option = new Option(null, "load-conf-from-manager", false, - "load configuration data from manager"); - option.setRequired(false); - options.addOption(option); - - CommandLineParser parser = new GnuParser(); - CommandLine commandLine = parser.parse(options, args); - - if (commandLine.hasOption('h')) { - new HelpFormatter().printHelp("flume-ng agent", options, true); - return; - } - - // start by manager configuation - if (commandLine.hasOption("load-conf-from-manager")) { - startByManagerConf(commandLine); - return; - } - - String agentName = commandLine.getOptionValue('n'); - boolean reload = !commandLine.hasOption("no-reload-conf"); - - boolean isZkConfigured = false; - if (commandLine.hasOption('z') || commandLine.hasOption("zkConnString")) { - isZkConfigured = true; - } - - Application application; - if (isZkConfigured) { - // get options - String zkConnectionStr = commandLine.getOptionValue('z'); - String baseZkPath = commandLine.getOptionValue('p'); - - if (reload) { - EventBus eventBus = new EventBus(agentName + "-event-bus"); - List<LifecycleAware> components = Lists.newArrayList(); - PollingZooKeeperConfigurationProvider zProvider = new PollingZooKeeperConfigurationProvider( - agentName, zkConnectionStr, baseZkPath, eventBus); - components.add(zProvider); - application = new Application(components); - eventBus.register(application); - } else { - StaticZooKeeperConfigurationProvider zProvider = new StaticZooKeeperConfigurationProvider( - agentName, zkConnectionStr, baseZkPath); - application = new Application(); - application.handleConfigurationEvent(zProvider.getConfiguration()); - } - } else { - File configurationFile = new File(commandLine.getOptionValue('f')); - - // The following is to ensure that by default the agent will fail on startup - // if the file does not exist. - if (!configurationFile.exists()) { - // If command line invocation, then need to fail fast - if (System.getProperty(Constants.SYSPROP_CALLED_FROM_SERVICE) == null) { - String path = configurationFile.getPath(); - try { - path = configurationFile.getCanonicalPath(); - } catch (IOException ex) { - logger.error("Failed to read canonical path for file: " + path, - ex); - } - throw new ParseException( - "The specified configuration file does not exist: " + path); - } - } - List<LifecycleAware> components = Lists.newArrayList(); - - if (reload) { - EventBus eventBus = new EventBus(agentName + "-event-bus"); - PollingPropertiesFileConfigurationProvider configurationProvider; - configurationProvider = new PollingPropertiesFileConfigurationProvider( - agentName, configurationFile, eventBus, 30); - components.add(configurationProvider); - application = new Application(components); - eventBus.register(application); - } else { - PropertiesFileConfigurationProvider configurationProvider; - configurationProvider = new PropertiesFileConfigurationProvider( - agentName, configurationFile); - application = new Application(); - application.handleConfigurationEvent(configurationProvider.getConfiguration()); - } - } - // metrics - MetricObserver.init(CommonPropertiesHolder.get()); - // audit - AuditUtils.initAudit(); - - final Application appReference = application; - Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") { - - @Override - public void run() { - AuditUtils.sendReport(); - appReference.stop(); - } - }); - - // start application - application.start(); - Thread.sleep(5000); - } catch (Exception e) { - logger.error("A fatal error occurred while running. Exception follows.", e); + LOGGER.warn("starting monitoring error, the monitoring might not be available: ", e); } } - /** - * startByManagerConf - * - * @param commandLine - */ - private static void startByManagerConf(CommandLine commandLine) { - String proxyName = CommonPropertiesHolder.getString(RemoteConfigManager.KEY_PROXY_CLUSTER_NAME); - ManagerPropertiesConfigurationProvider configurationProvider = new ManagerPropertiesConfigurationProvider( - proxyName); - Application application = new Application(); - application.handleConfigurationEvent(configurationProvider.getConfiguration()); - application.start(); - - final Application appReference = application; - Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") { - - @Override - public void run() { - appReference.stop(); - } - }); - } -} \ No newline at end of file +} diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/ManagerPropertiesConfigurationProvider.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/ManagerPropsConfigProvider.java similarity index 62% rename from inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/ManagerPropertiesConfigurationProvider.java rename to inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/ManagerPropsConfigProvider.java index d1451ef28..04adf218b 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/ManagerPropertiesConfigurationProvider.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/ManagerPropsConfigProvider.java @@ -1,10 +1,10 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with + * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -17,48 +17,38 @@ package org.apache.inlong.dataproxy.node; -import java.util.HashMap; -import java.util.Map; - import org.apache.flume.conf.FlumeConfiguration; import org.apache.flume.node.AbstractConfigurationProvider; import org.apache.inlong.dataproxy.config.RemoteConfigManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; +import java.util.Map; + /** - * - * ManagerPropertiesConfigurationProvider + * Manager properties configuration provider */ -public class ManagerPropertiesConfigurationProvider extends - AbstractConfigurationProvider { +public class ManagerPropsConfigProvider extends AbstractConfigurationProvider { - private static final Logger LOGGER = LoggerFactory - .getLogger(ManagerPropertiesConfigurationProvider.class); + private static final Logger LOGGER = LoggerFactory.getLogger(ManagerPropsConfigProvider.class); - /** - * ManagerPropertiJesConfigurationProvider - * - * @param agentName - */ - public ManagerPropertiesConfigurationProvider(String agentName) { + public ManagerPropsConfigProvider(String agentName) { super(agentName); } /** - * getFlumeConfiguration - * - * @return + * Get Flume configuration */ @Override public FlumeConfiguration getFlumeConfiguration() { try { Map<String, String> flumeProperties = RemoteConfigManager.getInstance().getFlumeProperties(); - LOGGER.info("flumeProperties:{}", flumeProperties); + LOGGER.info("all flume props: {}", flumeProperties); return new FlumeConfiguration(flumeProperties); } catch (Exception e) { - LOGGER.error("exception catch:" + e.getMessage(), e); + LOGGER.error("get flume props error: ", e); } - return new FlumeConfiguration(new HashMap<String, String>()); + return new FlumeConfiguration(new HashMap<>()); } } \ No newline at end of file diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortStandaloneApplication.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortStandaloneApplication.java index 50d6f69ce..e0094a3ce 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortStandaloneApplication.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortStandaloneApplication.java @@ -25,38 +25,35 @@ import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory; import org.slf4j.Logger; /** - * - * SortStandaloneApplication + * Sort Standalone Application */ public class SortStandaloneApplication { - public static final Logger LOG = InlongLoggerFactory.getLogger(Application.class); + public static final Logger LOGGER = InlongLoggerFactory.getLogger(Application.class); /** - * main - * - * @param args + * Main entrance */ public static void main(String[] args) { - LOG.info("start to sort-standalone"); + LOGGER.info("start to sort-standalone"); try { SortCluster cluster = new SortCluster(); - Runtime.getRuntime().addShutdownHook(new Thread("sortstandalone-shutdown-hook") { + Runtime.getRuntime().addShutdownHook(new Thread("sort-standalone-shutdown-hook") { @Override public void run() { - AuditUtils.sendReport(); + AuditUtils.send(); cluster.close(); } }); - // + // start the cluster cluster.start(); // metrics MetricObserver.init(CommonPropertiesHolder.get()); AuditUtils.initAudit(); Thread.sleep(5000); } catch (Exception e) { - LOG.error("A fatal error occurred while running. Exception follows.", e); + LOGGER.error("fatal error occurred while running sort-standalone: ", e); } } } \ No newline at end of file diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/audit/AuditUtils.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/audit/AuditUtils.java index bc0d219b8..6a7a8900c 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/audit/AuditUtils.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/audit/AuditUtils.java @@ -1,10 +1,10 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with + * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -17,22 +17,22 @@ package org.apache.inlong.sort.standalone.metrics.audit; -import java.util.HashSet; -import java.util.Map; - import org.apache.commons.lang3.BooleanUtils; -import org.apache.commons.lang3.math.NumberUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.math.NumberUtils; import org.apache.flume.Event; -import org.apache.inlong.audit.AuditImp; +import org.apache.inlong.audit.AuditOperator; import org.apache.inlong.audit.util.AuditConfig; import org.apache.inlong.sort.standalone.channel.ProfileEvent; import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder; import org.apache.inlong.sort.standalone.metrics.SortMetricItem; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; + /** - * - * AuditUtils + * Audit utils */ public class AuditUtils { @@ -49,7 +49,7 @@ public class AuditUtils { private static boolean IS_AUDIT = true; /** - * initAudit + * Init audit */ public static void initAudit() { // IS_AUDIT @@ -60,11 +60,9 @@ public class AuditUtils { HashSet<String> proxys = new HashSet<>(); if (!StringUtils.isBlank(strIpPorts)) { String[] ipPorts = strIpPorts.split("\\s+"); - for (String ipPort : ipPorts) { - proxys.add(ipPort); - } + Collections.addAll(proxys, ipPorts); } - AuditImp.getInstance().setAuditProxy(proxys); + AuditOperator.getInstance().setAuditProxy(proxys); // AuditConfig String filePath = CommonPropertiesHolder.getString(AUDIT_KEY_FILE_PATH, AUDIT_DEFAULT_FILE_PATH); @@ -72,30 +70,24 @@ public class AuditUtils { CommonPropertiesHolder.getString(AUDIT_KEY_MAX_CACHE_ROWS), AUDIT_DEFAULT_MAX_CACHE_ROWS); AuditConfig auditConfig = new AuditConfig(filePath, maxCacheRow); - AuditImp.getInstance().setAuditConfig(auditConfig); + AuditOperator.getInstance().setAuditConfig(auditConfig); } } /** - * add - * - * @param auditID - * @param event + * Add audit data */ public static void add(int auditID, ProfileEvent event) { if (IS_AUDIT && event != null) { String inlongGroupId = event.getInlongGroupId(); String inlongStreamId = event.getInlongStreamId(); long logTime = event.getRawLogTime(); - AuditImp.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, 1, event.getBody().length); + AuditOperator.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, 1, event.getBody().length); } } /** - * add - * - * @param auditID - * @param event + * Add audit data */ public static void add(int auditID, Event event) { if (IS_AUDIT && event != null) { @@ -103,14 +95,14 @@ public class AuditUtils { String inlongGroupId = SortMetricItem.getInlongGroupId(headers); String inlongStreamId = SortMetricItem.getInlongStreamId(headers); long logTime = SortMetricItem.getLogTime(headers); - AuditImp.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, 1, event.getBody().length); + AuditOperator.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, 1, event.getBody().length); } } /** - * sendReport + * Send audit data */ - public static void sendReport() { - AuditImp.getInstance().sendReport(); + public static void send() { + AuditOperator.getInstance().send(); } } diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java index 45e33c3c2..a117a3f27 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java @@ -22,7 +22,7 @@ import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.SimpleCounter; -import org.apache.inlong.audit.AuditImp; +import org.apache.inlong.audit.AuditOperator; import org.apache.inlong.sort.base.Constants; import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; @@ -43,9 +43,10 @@ import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_PER_SECOND; */ public class SinkMetricData implements MetricData { - private MetricGroup metricGroup; - private Map<String, String> labels; - private AuditImp auditImp; + private final MetricGroup metricGroup; + private final Map<String, String> labels; + private final RegisteredMetric registeredMetric; + private AuditOperator auditOperator; private Counter numRecordsOut; private Counter numBytesOut; private Counter numRecordsOutForMeter; @@ -54,7 +55,6 @@ public class SinkMetricData implements MetricData { private Counter dirtyBytes; private Meter numRecordsOutPerSecond; private Meter numBytesOutPerSecond; - private RegisteredMetric registeredMetric; public SinkMetricData(MetricOption option, MetricGroup metricGroup) { this.metricGroup = metricGroup; @@ -94,8 +94,8 @@ public class SinkMetricData implements MetricData { } if (option.getIpPorts().isPresent()) { - AuditImp.getInstance().setAuditProxy(option.getIpPortList()); - this.auditImp = AuditImp.getInstance(); + AuditOperator.getInstance().setAuditProxy(option.getIpPortList()); + this.auditOperator = AuditOperator.getInstance(); } } @@ -271,8 +271,8 @@ public class SinkMetricData implements MetricData { numBytesOutForMeter.inc(rowSize); } - if (auditImp != null) { - auditImp.add( + if (auditOperator != null) { + auditOperator.add( Constants.AUDIT_SORT_OUTPUT, getGroupId(), getStreamId(), @@ -296,10 +296,10 @@ public class SinkMetricData implements MetricData { public String toString() { switch (registeredMetric) { case DIRTY: - return "SinkMetricData{" + return "SinkMetricData{" + "metricGroup=" + metricGroup + ", labels=" + labels - + ", auditImp=" + auditImp + + ", auditOperator=" + auditOperator + ", dirtyRecords=" + dirtyRecords.getCount() + ", dirtyBytes=" + dirtyBytes.getCount() + '}'; @@ -307,7 +307,7 @@ public class SinkMetricData implements MetricData { return "SinkMetricData{" + "metricGroup=" + metricGroup + ", labels=" + labels - + ", auditImp=" + auditImp + + ", auditOperator=" + auditOperator + ", numRecordsOut=" + numRecordsOut.getCount() + ", numBytesOut=" + numBytesOut.getCount() + ", numRecordsOutForMeter=" + numRecordsOutForMeter.getCount() @@ -319,7 +319,7 @@ public class SinkMetricData implements MetricData { return "SinkMetricData{" + "metricGroup=" + metricGroup + ", labels=" + labels - + ", auditImp=" + auditImp + + ", auditOperator=" + auditOperator + ", numRecordsOut=" + numRecordsOut.getCount() + ", numBytesOut=" + numBytesOut.getCount() + ", numRecordsOutForMeter=" + numRecordsOutForMeter.getCount() diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java index 3ac6a96f8..b93c5bb53 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java @@ -22,7 +22,7 @@ import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.SimpleCounter; -import org.apache.inlong.audit.AuditImp; +import org.apache.inlong.audit.AuditOperator; import org.apache.inlong.sort.base.Constants; import java.nio.charset.StandardCharsets; @@ -40,15 +40,15 @@ import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND; */ public class SourceMetricData implements MetricData { - private MetricGroup metricGroup; - private Map<String, String> labels; + private final MetricGroup metricGroup; + private final Map<String, String> labels; private Counter numRecordsIn; private Counter numBytesIn; private Counter numRecordsInForMeter; private Counter numBytesInForMeter; private Meter numRecordsInPerSecond; private Meter numBytesInPerSecond; - private AuditImp auditImp; + private AuditOperator auditOperator; public SourceMetricData(MetricOption option, MetricGroup metricGroup) { this.metricGroup = metricGroup; @@ -70,8 +70,8 @@ public class SourceMetricData implements MetricData { } if (option.getIpPorts().isPresent()) { - AuditImp.getInstance().setAuditProxy(option.getIpPortList()); - this.auditImp = AuditImp.getInstance(); + AuditOperator.getInstance().setAuditProxy(option.getIpPortList()); + this.auditOperator = AuditOperator.getInstance(); } } @@ -211,8 +211,8 @@ public class SourceMetricData implements MetricData { this.numBytesInForMeter.inc(rowDataSize); } - if (auditImp != null) { - auditImp.add( + if (auditOperator != null) { + auditOperator.add( Constants.AUDIT_SORT_INPUT, getGroupId(), getStreamId(), @@ -233,7 +233,7 @@ public class SourceMetricData implements MetricData { + ", numBytesInForMeter=" + numBytesInForMeter.getCount() + ", numRecordsInPerSecond=" + numRecordsInPerSecond.getRate() + ", numBytesInPerSecond=" + numBytesInPerSecond.getRate() - + ", auditImp=" + auditImp + + ", auditOperator=" + auditOperator + '}'; } } diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/audit/AuditUtils.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/audit/AuditUtils.java index b4fbbb9da..dea86d46a 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/audit/AuditUtils.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/audit/AuditUtils.java @@ -1,10 +1,10 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with + * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -17,8 +17,7 @@ package org.apache.inlong.tubemq.server.broker.stats.audit; -import java.util.Map; -import org.apache.inlong.audit.AuditImp; +import org.apache.inlong.audit.AuditOperator; import org.apache.inlong.audit.util.AuditConfig; import org.apache.inlong.tubemq.corebase.TokenConstants; import org.apache.inlong.tubemq.corebase.utils.DateTimeConvertUtils; @@ -26,18 +25,21 @@ import org.apache.inlong.tubemq.corebase.utils.TStringUtils; import org.apache.inlong.tubemq.server.broker.stats.TrafficInfo; import org.apache.inlong.tubemq.server.common.fileconfig.ADConfig; +import java.util.Map; + /** * AuditUtils * * A wrapper class for Audit report operations */ public class AuditUtils { + private static ADConfig auditConfig = new ADConfig(); /** * init audit instance * - * @param adConfig the initial configure + * @param adConfig the initial configure */ public static void initAudit(ADConfig adConfig) { // check whether enable audit @@ -48,28 +50,27 @@ public class AuditUtils { auditConfig = adConfig; // initial audit instance - AuditImp.getInstance().setAuditProxy(adConfig.getAuditProxyAddrSet()); + AuditOperator.getInstance().setAuditProxy(adConfig.getAuditProxyAddrSet()); AuditConfig auditConfig = new AuditConfig(adConfig.getAuditCacheFilePath(), adConfig.getAuditCacheMaxRows()); - AuditImp.getInstance().setAuditConfig(auditConfig); + AuditOperator.getInstance().setAuditConfig(auditConfig); } /** * add produce record * - * @param groupId the group id - * @param streamId the stream id - * @param logTime the record time - * @param count the record count - * @param size the record size + * @param groupId the group id + * @param streamId the stream id + * @param logTime the record time + * @param count the record count + * @param size the record size */ - public static void addProduceRecord(String groupId, String streamId, - String logTime, long count, long size) { + public static void addProduceRecord(String groupId, String streamId, String logTime, long count, long size) { if (!auditConfig.isAuditEnable()) { return; } - AuditImp.getInstance().add(auditConfig.getAuditIdProduce(), + AuditOperator.getInstance().add(auditConfig.getAuditIdProduce(), groupId, streamId, DateTimeConvertUtils.yyyyMMddHHmm2ms(logTime), count, size); } @@ -79,8 +80,7 @@ public class AuditUtils { * @param trafficInfos the consumed traffic information */ public static void addConsumeRecord(Map<String, TrafficInfo> trafficInfos) { - if (!auditConfig.isAuditEnable() - || trafficInfos == null || trafficInfos.isEmpty()) { + if (!auditConfig.isAuditEnable() || trafficInfos == null || trafficInfos.isEmpty()) { return; } for (Map.Entry<String, TrafficInfo> entry : trafficInfos.entrySet()) { @@ -99,19 +99,19 @@ public class AuditUtils { // #127.0.0.1#32677#test_consume#2#202207041219 // topicName, brokerIP, clientId, // clientIP, client processId, consume group, partitionId, msgTime - AuditImp.getInstance().add(auditConfig.getAuditIdConsume(), + AuditOperator.getInstance().add(auditConfig.getAuditIdConsume(), statKeyItems[0], statKeyItems[5], DateTimeConvertUtils.yyyyMMddHHmm2ms(statKeyItems[7]), entry.getValue().getMsgCount(), entry.getValue().getMsgSize()); } } /** - * close audit report + * Close audit, if it was enabled, send its data first. */ public static void closeAudit() { if (!auditConfig.isAuditEnable()) { return; } - AuditImp.getInstance().sendReport(); + AuditOperator.getInstance().send(); } }