This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 0bf7df2fe [INLONG-5723][DataProxy] Fix source and sink metrics are 
incorrect when msgType equals 5. (#5728)
0bf7df2fe is described below

commit 0bf7df2fe4aa884f5ef67881404b2e96dfbf51a7
Author: xueyingzhang <[email protected]>
AuthorDate: Mon Aug 29 14:08:10 2022 +0800

    [INLONG-5723][DataProxy] Fix source and sink metrics are incorrect when 
msgType equals 5. (#5728)
---
 .../dataproxy/source/ServerMessageHandler.java     | 44 ++++++++++++----------
 1 file changed, 24 insertions(+), 20 deletions(-)

diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 8c158d3ce..d79143260 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -17,28 +17,14 @@
 
 package org.apache.inlong.dataproxy.source;
 
-import static org.apache.inlong.dataproxy.consts.AttributeConstants.SEPARATOR;
-import static 
org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_DATA;
-import static 
org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_GROUPID;
-import static org.apache.inlong.dataproxy.source.SimpleTcpSource.blacklist;
-
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.group.ChannelGroup;
-import java.io.IOException;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flume.ChannelException;
 import org.apache.flume.Event;
@@ -63,8 +49,21 @@ import org.apache.inlong.dataproxy.utils.NetworkUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Joiner;
-import com.google.common.base.Splitter;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.inlong.dataproxy.consts.AttributeConstants.SEPARATOR;
+import static 
org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_DATA;
+import static 
org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_GROUPID;
+import static org.apache.inlong.dataproxy.source.SimpleTcpSource.blacklist;
 
 /**
  * Server message handler
@@ -400,6 +399,7 @@ public class ServerMessageHandler extends 
ChannelInboundHandlerAdapter {
         } else if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
             inLongMsgVer = 4;
         }
+        int recordMsgCnt = 
Integer.parseInt(commonAttrMap.get(AttributeConstants.MESSAGE_COUNT));
 
         for (Map.Entry<String, HashMap<String, List<ProxyMessage>>> topicEntry 
: messageMap.entrySet()) {
             for (Map.Entry<String, List<ProxyMessage>> streamIdEntry : 
topicEntry.getValue().entrySet()) {
@@ -456,6 +456,10 @@ public class ServerMessageHandler extends 
ChannelInboundHandlerAdapter {
                 headers.put(ConfigConstants.REMOTE_IDC_KEY, 
DEFAULT_REMOTE_IDC_VALUE);
                 // every message share the same msg cnt? what if msgType = 5
                 String proxyMetricMsgCnt = 
commonAttrMap.get(AttributeConstants.MESSAGE_COUNT);
+                if (MsgType.MSG_MULTI_BODY_ATTR.equals(msgType) || 
MsgType.MSG_MULTI_BODY.equals(msgType)) {
+                    commonAttrMap.put(AttributeConstants.MESSAGE_COUNT, 
String.valueOf(recordMsgCnt));
+                    proxyMetricMsgCnt = 
commonAttrMap.get(AttributeConstants.MESSAGE_COUNT);
+                }
                 headers.put(ConfigConstants.MSG_COUNTER_KEY, 
proxyMetricMsgCnt);
 
                 byte[] data = inLongMsg.buildArray();
@@ -510,7 +514,7 @@ public class ServerMessageHandler extends 
ChannelInboundHandlerAdapter {
                 } catch (Throwable ex) {
                     logger.error("Error writting to channel,data will 
discard.", ex);
                     monitorIndexExt.incrementAndGet("EVENT_DROPPED");
-                    monitorIndex.addAndGet(new String(newbase), 0,0,0,
+                    monitorIndex.addAndGet(new String(newbase), 0, 0, 0,
                             Integer.parseInt(proxyMetricMsgCnt));
                     this.addMetric(false, data.length, event);
                     throw new ChannelException("ProcessEvent error can't write 
event to channel.");
@@ -731,7 +735,7 @@ public class ServerMessageHandler extends 
ChannelInboundHandlerAdapter {
 
     /**
      * addMetric
-     * 
+     *
      * @param result
      * @param size
      * @param event

Reply via email to