This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 0bf7df2fe [INLONG-5723][DataProxy] Fix source and sink metrics are
incorrect when msgType equals 5. (#5728)
0bf7df2fe is described below
commit 0bf7df2fe4aa884f5ef67881404b2e96dfbf51a7
Author: xueyingzhang <[email protected]>
AuthorDate: Mon Aug 29 14:08:10 2022 +0800
[INLONG-5723][DataProxy] Fix source and sink metrics are incorrect when
msgType equals 5. (#5728)
---
.../dataproxy/source/ServerMessageHandler.java | 44 ++++++++++++----------
1 file changed, 24 insertions(+), 20 deletions(-)
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 8c158d3ce..d79143260 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -17,28 +17,14 @@
package org.apache.inlong.dataproxy.source;
-import static org.apache.inlong.dataproxy.consts.AttributeConstants.SEPARATOR;
-import static
org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_DATA;
-import static
org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_GROUPID;
-import static org.apache.inlong.dataproxy.source.SimpleTcpSource.blacklist;
-
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.group.ChannelGroup;
-import java.io.IOException;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.ChannelException;
import org.apache.flume.Event;
@@ -63,8 +49,21 @@ import org.apache.inlong.dataproxy.utils.NetworkUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Joiner;
-import com.google.common.base.Splitter;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.inlong.dataproxy.consts.AttributeConstants.SEPARATOR;
+import static
org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_DATA;
+import static
org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_GROUPID;
+import static org.apache.inlong.dataproxy.source.SimpleTcpSource.blacklist;
/**
* Server message handler
@@ -400,6 +399,7 @@ public class ServerMessageHandler extends
ChannelInboundHandlerAdapter {
} else if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
inLongMsgVer = 4;
}
+ int recordMsgCnt =
Integer.parseInt(commonAttrMap.get(AttributeConstants.MESSAGE_COUNT));
for (Map.Entry<String, HashMap<String, List<ProxyMessage>>> topicEntry
: messageMap.entrySet()) {
for (Map.Entry<String, List<ProxyMessage>> streamIdEntry :
topicEntry.getValue().entrySet()) {
@@ -456,6 +456,10 @@ public class ServerMessageHandler extends
ChannelInboundHandlerAdapter {
headers.put(ConfigConstants.REMOTE_IDC_KEY,
DEFAULT_REMOTE_IDC_VALUE);
// every message share the same msg cnt? what if msgType = 5
String proxyMetricMsgCnt =
commonAttrMap.get(AttributeConstants.MESSAGE_COUNT);
+ if (MsgType.MSG_MULTI_BODY_ATTR.equals(msgType) ||
MsgType.MSG_MULTI_BODY.equals(msgType)) {
+ commonAttrMap.put(AttributeConstants.MESSAGE_COUNT,
String.valueOf(recordMsgCnt));
+ proxyMetricMsgCnt =
commonAttrMap.get(AttributeConstants.MESSAGE_COUNT);
+ }
headers.put(ConfigConstants.MSG_COUNTER_KEY,
proxyMetricMsgCnt);
byte[] data = inLongMsg.buildArray();
@@ -510,7 +514,7 @@ public class ServerMessageHandler extends
ChannelInboundHandlerAdapter {
} catch (Throwable ex) {
logger.error("Error writting to channel,data will
discard.", ex);
monitorIndexExt.incrementAndGet("EVENT_DROPPED");
- monitorIndex.addAndGet(new String(newbase), 0,0,0,
+ monitorIndex.addAndGet(new String(newbase), 0, 0, 0,
Integer.parseInt(proxyMetricMsgCnt));
this.addMetric(false, data.length, event);
throw new ChannelException("ProcessEvent error can't write
event to channel.");
@@ -731,7 +735,7 @@ public class ServerMessageHandler extends
ChannelInboundHandlerAdapter {
/**
* addMetric
- *
+ *
* @param result
* @param size
* @param event