This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 28340ca90 [INLONG-6022][Agent] Fix lost read and send count (#6023) 28340ca90 is described below commit 28340ca905107f91777a01e17f3285d325bc3cda Author: Lucas <100204617+lucaspeng12...@users.noreply.github.com> AuthorDate: Tue Sep 27 20:08:53 2022 +0800 [INLONG-6022][Agent] Fix lost read and send count (#6023) --- .../inlong/agent/metrics/AgentMetricItem.java | 7 ++- .../inlong/agent/plugin/channel/MemoryChannel.java | 70 ++++++++++++++-------- .../inlong/agent/plugin/sinks/SenderManager.java | 6 ++ .../agent/plugin/sources/reader/BinlogReader.java | 3 +- .../agent/plugin/sources/reader/KafkaReader.java | 1 + .../agent/plugin/sources/reader/MongoDBReader.java | 4 +- .../plugin/sources/reader/PostgreSQLReader.java | 3 +- .../plugin/sources/reader/SQLServerReader.java | 2 + .../agent/plugin/sources/reader/SqlReader.java | 2 + .../sources/reader/file/FileReaderOperator.java | 1 + .../agent/plugin/sources/TestSQLServerReader.java | 4 ++ 11 files changed, 70 insertions(+), 33 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentMetricItem.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentMetricItem.java index dbec354ff..96d35f97c 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentMetricItem.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentMetricItem.java @@ -19,6 +19,7 @@ package org.apache.inlong.agent.metrics; import org.apache.inlong.common.metric.CountMetric; import org.apache.inlong.common.metric.Dimension; +import org.apache.inlong.common.metric.GaugeMetric; import org.apache.inlong.common.metric.MetricDomain; import org.apache.inlong.common.metric.MetricItem; @@ -61,13 +62,13 @@ public class AgentMetricItem extends MetricItem { @Dimension public String inlongStreamId; - @CountMetric + @GaugeMetric public AtomicLong jobRunningCount = new AtomicLong(0); @CountMetric public AtomicLong jobFatalCount = new AtomicLong(0); - @CountMetric + @GaugeMetric public AtomicLong taskRunningCount = new AtomicLong(0); - @CountMetric + @GaugeMetric public AtomicLong taskRetryingCount = new AtomicLong(0); @CountMetric public AtomicLong taskFatalCount = new AtomicLong(0); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/channel/MemoryChannel.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/channel/MemoryChannel.java index 9125b5852..086d45b8b 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/channel/MemoryChannel.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/channel/MemoryChannel.java @@ -19,7 +19,6 @@ package org.apache.inlong.agent.plugin.channel; import org.apache.inlong.agent.conf.JobProfile; import org.apache.inlong.agent.constant.AgentConstants; -import org.apache.inlong.agent.message.ProxyMessage; import org.apache.inlong.agent.metrics.AgentMetricItem; import org.apache.inlong.agent.metrics.AgentMetricItemSet; import org.apache.inlong.agent.plugin.Channel; @@ -35,7 +34,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID; +import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID; +import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID; +import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID; import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID; +import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID; import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID; /** @@ -49,38 +52,31 @@ public class MemoryChannel implements Channel { //metric private AgentMetricItemSet metricItemSet; private static final AtomicLong METRIC_INDEX = new AtomicLong(0); + private String inlongGroupId; + private String inlongStreamId; public MemoryChannel() { } @Override public void push(Message message) { - String groupId = DEFAULT_PROXY_INLONG_GROUP_ID; try { if (message != null) { - if (message instanceof ProxyMessage) { - groupId = ((ProxyMessage) message).getInlongGroupId(); - } - AgentMetricItem metricItem = getMetricItem(KEY_INLONG_GROUP_ID, groupId); + AgentMetricItem metricItem = getMetricItem(new HashMap<String, String>()); metricItem.pluginReadCount.incrementAndGet(); queue.put(message); metricItem.pluginReadSuccessCount.incrementAndGet(); } } catch (InterruptedException ex) { - getMetricItem(KEY_INLONG_GROUP_ID, groupId).pluginReadFailCount.incrementAndGet(); - Thread.currentThread().interrupt(); + this.metricItemReadFailed(); } } @Override public boolean push(Message message, long timeout, TimeUnit unit) { - String groupId = DEFAULT_PROXY_INLONG_GROUP_ID; try { if (message != null) { - if (message instanceof ProxyMessage) { - groupId = ((ProxyMessage) message).getInlongGroupId(); - } - AgentMetricItem metricItem = getMetricItem(KEY_INLONG_GROUP_ID, groupId); + AgentMetricItem metricItem = getMetricItem(new HashMap<String, String>()); metricItem.pluginReadCount.incrementAndGet(); boolean result = queue.offer(message, timeout, unit); if (result) { @@ -91,36 +87,31 @@ public class MemoryChannel implements Channel { return result; } } catch (InterruptedException ex) { - AgentMetricItem metricItem = getMetricItem(KEY_INLONG_GROUP_ID, groupId); - metricItem.pluginReadFailCount.incrementAndGet(); - Thread.currentThread().interrupt(); + this.metricItemReadFailed(); } return false; } @Override public Message pull(long timeout, TimeUnit unit) { - String groupId = DEFAULT_PROXY_INLONG_GROUP_ID; try { Message message = queue.poll(timeout, unit); if (message != null) { - if (message instanceof ProxyMessage) { - groupId = ((ProxyMessage) message).getInlongGroupId(); - } - AgentMetricItem metricItem = getMetricItem(KEY_INLONG_GROUP_ID, groupId); + AgentMetricItem metricItem = getMetricItem(new HashMap<String, String>()); metricItem.pluginSendSuccessCount.incrementAndGet(); + metricItem.pluginSendCount.incrementAndGet(); } return message; } catch (InterruptedException ex) { - AgentMetricItem metricItem = getMetricItem(KEY_INLONG_GROUP_ID, groupId); - metricItem.pluginSendFailCount.incrementAndGet(); - Thread.currentThread().interrupt(); + this.metricItemSendFailed(); throw new IllegalStateException(ex); } } @Override public void init(JobProfile jobConf) { + inlongGroupId = jobConf.get(PROXY_INLONG_GROUP_ID, DEFAULT_PROXY_INLONG_GROUP_ID); + inlongStreamId = jobConf.get(PROXY_INLONG_STREAM_ID, DEFAULT_PROXY_INLONG_STREAM_ID); queue = new LinkedBlockingQueue<>( jobConf.getInt(AgentConstants.CHANNEL_MEMORY_CAPACITY, AgentConstants.DEFAULT_CHANNEL_MEMORY_CAPACITY)); @@ -138,10 +129,37 @@ public class MemoryChannel implements Channel { LOGGER.info("destroy channel, show memory channel metric:"); } - private AgentMetricItem getMetricItem(String otherKey, String value) { + private AgentMetricItem getMetricItem(Map<String, String> dimens) { Map<String, String> dimensions = new HashMap<>(); dimensions.put(KEY_PLUGIN_ID, this.getClass().getSimpleName()); - dimensions.put(otherKey, value); + dimensions.put(KEY_INLONG_GROUP_ID, inlongGroupId); + dimensions.put(KEY_INLONG_STREAM_ID, inlongStreamId); + dimens.forEach((key, value) -> { + dimensions.put(key, value); + }); return this.metricItemSet.findMetricItem(dimensions); } + + private void metricItemReadFailed() { + Map<String, String> dimensions = new HashMap<String, String>(); + dimensions.put(KEY_INLONG_GROUP_ID, inlongGroupId); + dimensions.put(KEY_INLONG_STREAM_ID, inlongStreamId); + AgentMetricItem metricItem = getMetricItem(dimensions); + metricItem.pluginReadFailCount.incrementAndGet(); + LOGGER.debug("plugin read failed:{}", dimensions.toString()); + Thread.currentThread().interrupt(); + return; + } + + private void metricItemSendFailed() { + Map<String, String> dimensions = new HashMap<String, String>(); + dimensions.put(KEY_INLONG_GROUP_ID, inlongGroupId); + dimensions.put(KEY_INLONG_STREAM_ID, inlongStreamId); + AgentMetricItem metricItem = getMetricItem(dimensions); + metricItem.pluginSendFailCount.incrementAndGet(); + metricItem.pluginSendCount.incrementAndGet(); + LOGGER.debug("plugin send failed:{}", dimensions.toString()); + Thread.currentThread().interrupt(); + return; + } } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java index dd164847d..9cc10d604 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java @@ -256,6 +256,7 @@ public class SenderManager { AgentUtils.silenceSleepInMs(retrySleepTime); } Map<String, String> dims = new HashMap<>(); + dims.put(KEY_PLUGIN_ID, this.getClass().getSimpleName()); dims.put(KEY_INLONG_GROUP_ID, groupId); dims.put(KEY_INLONG_STREAM_ID, streamId); try { @@ -264,11 +265,13 @@ public class SenderManager { if (result == SendResult.OK) { semaphore.release(bodyList.size()); getMetricItem(dims).pluginSendSuccessCount.addAndGet(bodyList.size()); + getMetricItem(dims).pluginSendCount.addAndGet(bodyList.size()); long totalSize = bodyList.stream().mapToLong(body -> body.length).sum(); AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, groupId, streamId, dataTime, bodyList.size(), totalSize); } else { getMetricItem(dims).pluginSendFailCount.addAndGet(bodyList.size()); + getMetricItem(dims).pluginSendCount.addAndGet(bodyList.size()); LOGGER.warn("send data to dataproxy error {}", result.toString()); sendBatchSync(groupId, streamId, bodyList, retry + 1, dataTime, extraMap); } @@ -279,6 +282,7 @@ public class SenderManager { try { TimeUnit.SECONDS.sleep(1); getMetricItem(dims).pluginSendFailCount.addAndGet(bodyList.size()); + getMetricItem(dims).pluginSendCount.addAndGet(bodyList.size()); sendBatchSync(groupId, streamId, bodyList, retry + 1, dataTime, extraMap); } catch (Exception ignored) { // ignore it. @@ -319,12 +323,14 @@ public class SenderManager { } semaphore.release(bodyList.size()); Map<String, String> dims = new HashMap<>(); + dims.put(KEY_PLUGIN_ID, this.getClass().getSimpleName()); dims.put(KEY_INLONG_GROUP_ID, groupId); dims.put(KEY_INLONG_STREAM_ID, streamId); long totalSize = bodyList.stream().mapToLong(body -> body.length).sum(); AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, groupId, streamId, dataTime, bodyList.size(), totalSize); getMetricItem(dims).pluginSendSuccessCount.addAndGet(bodyList.size()); + getMetricItem(dims).pluginSendCount.addAndGet(bodyList.size()); if (sourcePath != null) { taskPositionManager.updateSinkPosition(jobId, sourcePath, bodyList.size()); } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java index 3fab3899c..38877951b 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java @@ -111,7 +111,6 @@ public class BinlogReader extends AbstractReader { @Override public Message read() { if (!binlogMessagesQueue.isEmpty()) { - readerMetric.pluginReadCount.incrementAndGet(); return getBinlogMessage(); } else { return null; @@ -170,9 +169,11 @@ public class BinlogReader extends AbstractReader { long dataSize = records.stream().mapToLong(r -> r.value().length()).sum(); AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId, System.currentTimeMillis(), records.size(), dataSize); + readerMetric.pluginReadSuccessCount.addAndGet(records.size()); readerMetric.pluginReadCount.addAndGet(records.size()); } catch (Exception e) { readerMetric.pluginReadFailCount.addAndGet(records.size()); + readerMetric.pluginReadCount.addAndGet(records.size()); LOGGER.error("parse binlog message error", e); } }) diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java index 79e5479ee..52e72ed10 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java @@ -121,6 +121,7 @@ public class KafkaReader<K, V> extends AbstractReader { "partition:" + record.partition() + ", value:" + new String(recordValue) + ", offset:" + record.offset()); // control speed + readerMetric.pluginReadSuccessCount.incrementAndGet(); readerMetric.pluginReadCount.incrementAndGet(); // commit succeed,then record current offset snapshot = record.partition() + JOB_KAFKA_PARTITION_OFFSET_DELIMITER + record.offset(); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java index c7f0341f5..3abe037bb 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java @@ -136,7 +136,6 @@ public class MongoDBReader extends AbstractReader { @Override public Message read() { if (!bufferPool.isEmpty()) { - super.readerMetric.pluginReadCount.incrementAndGet(); return this.pollMessage(); } else { return null; @@ -367,12 +366,13 @@ public class MongoDBReader extends AbstractReader { long dataSize = records.stream().mapToLong(c -> c.value().length()).sum(); AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, super.inlongGroupId, super.inlongStreamId, System.currentTimeMillis(), records.size(), dataSize); + readerMetric.pluginReadSuccessCount.addAndGet(records.size()); readerMetric.pluginReadCount.addAndGet(records.size()); } catch (InterruptedException e) { e.printStackTrace(); LOGGER.error("parse mongo message error", e); - readerMetric.pluginReadFailCount.addAndGet(records.size()); + readerMetric.pluginReadCount.addAndGet(records.size()); } } } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java index dc7587bc7..23bfa1fbe 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java @@ -108,7 +108,6 @@ public class PostgreSQLReader extends AbstractReader { @Override public Message read() { if (!postgreSQLMessageQueue.isEmpty()) { - readerMetric.pluginReadCount.incrementAndGet(); return getPostgreSQLMessage(); } else { return null; @@ -165,9 +164,11 @@ public class PostgreSQLReader extends AbstractReader { long dataSize = records.stream().mapToLong(c -> c.value().length()).sum(); AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId, System.currentTimeMillis(), records.size(), dataSize); + readerMetric.pluginReadSuccessCount.addAndGet(records.size()); readerMetric.pluginReadCount.addAndGet(records.size()); } catch (Exception e) { readerMetric.pluginReadFailCount.addAndGet(records.size()); + readerMetric.pluginReadCount.addAndGet(records.size()); LOGGER.error("parse binlog message error", e); } }) diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java index 9edb14ba3..c4daa8560 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java @@ -112,11 +112,13 @@ public class SQLServerReader extends AbstractReader { long dataSize = lineColumns.stream().mapToLong(column -> column.length()).sum(); AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId, System.currentTimeMillis(), 1, dataSize); + readerMetric.pluginReadSuccessCount.incrementAndGet(); readerMetric.pluginReadCount.incrementAndGet(); return generateMessage(lineColumns); } catch (Exception ex) { LOGGER.error("error while reading data", ex); readerMetric.pluginReadFailCount.incrementAndGet(); + readerMetric.pluginReadCount.incrementAndGet(); throw new RuntimeException(ex); } } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java index b1b0c2280..130326462 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java @@ -118,6 +118,7 @@ public class SqlReader extends AbstractReader { long dataSize = lineColumns.stream().mapToLong(column -> column.length()).sum(); AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId, System.currentTimeMillis(), 1, dataSize); + readerMetric.pluginReadSuccessCount.incrementAndGet(); readerMetric.pluginReadCount.incrementAndGet(); return generateMessage(lineColumns); } else { @@ -126,6 +127,7 @@ public class SqlReader extends AbstractReader { } catch (Exception ex) { LOGGER.error("error while reading data", ex); readerMetric.pluginReadFailCount.incrementAndGet(); + readerMetric.pluginReadCount.incrementAndGet(); throw new RuntimeException(ex); } return null; diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java index 83d6e2a97..33e8816cc 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java @@ -93,6 +93,7 @@ public class FileReaderOperator extends AbstractReader { if (validateMessage(message)) { AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId, System.currentTimeMillis(), 1, message.length()); + readerMetric.pluginReadSuccessCount.incrementAndGet(); readerMetric.pluginReadCount.incrementAndGet(); String proxyPartitionKey = jobConf.get(PROXY_SEND_PARTITION_KEY, DigestUtils.md5Hex(inlongGroupId)); Map<String, String> header = new HashMap<>(); diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerReader.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerReader.java index 6b100fd84..7b07378b6 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerReader.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerReader.java @@ -93,6 +93,8 @@ public class TestSQLServerReader { private AtomicLong atomicLong; + private AtomicLong atomicCountLong; + private String sql; @Before @@ -107,6 +109,7 @@ public class TestSQLServerReader { final String groupId = "group01"; final String streamId = "stream01"; atomicLong = new AtomicLong(0L); + atomicCountLong = new AtomicLong(0L); sql = "select * from dbo.test01"; @@ -143,6 +146,7 @@ public class TestSQLServerReader { whenNew(AgentMetricItemSet.class).withArguments(anyString()).thenReturn(agentMetricItemSet); when(agentMetricItemSet.findMetricItem(any())).thenReturn(agentMetricItem); field(AgentMetricItem.class, "pluginReadCount").set(agentMetricItem, atomicLong); + field(AgentMetricItem.class, "pluginReadSuccessCount").set(agentMetricItem, atomicCountLong); //init method (reader = new SQLServerReader(sql)).init(jobProfile);