This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new cf663fcfec [INLONG-9338][Agent] Real time file collection uses the current time as the data time (#9339) cf663fcfec is described below commit cf663fcfece271616e9660f3fe2a215c78c6be0b Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Mon Nov 27 17:15:33 2023 +0800 [INLONG-9338][Agent] Real time file collection uses the current time as the data time (#9339) --- .../inlong/agent/conf/AbstractConfiguration.java | 2 ++ .../message/filecollect/ProxyMessageCache.java | 18 +++++++++++++++--- .../inlong/agent/core/task/file/TaskManager.java | 2 +- .../plugin/sinks/filecollect/SenderManager.java | 2 +- .../inlong/agent/plugin/sources/LogFileSource.java | 21 ++++++++++++++++++--- 5 files changed, 37 insertions(+), 8 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/AbstractConfiguration.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/AbstractConfiguration.java index b6a8b19d73..ec7fcf1151 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/AbstractConfiguration.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/AbstractConfiguration.java @@ -175,6 +175,7 @@ public abstract class AbstractConfiguration { public int getInt(String key) { JsonElement value = configStorage.get(key); if (value == null) { + LOGGER.error("null value for key " + key); throw new NullPointerException("null value for key " + key); } return value.getAsInt(); @@ -231,6 +232,7 @@ public abstract class AbstractConfiguration { public String get(String key) { JsonElement value = configStorage.get(key); if (value == null) { + LOGGER.error("null value for key " + key); throw new NullPointerException("null value for key " + key); } return value.getAsString(); diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java index 66b3fb3b43..3c7742365e 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java @@ -18,6 +18,7 @@ package org.apache.inlong.agent.message.filecollect; import org.apache.inlong.agent.conf.InstanceProfile; +import org.apache.inlong.agent.constant.CycleUnitType; import org.apache.inlong.agent.utils.AgentUtils; import org.apache.inlong.agent.utils.DateTransUtils; import org.apache.inlong.common.msg.AttributeConstants; @@ -62,6 +63,7 @@ public class ProxyMessageCache { private final AtomicLong cacheSize = new AtomicLong(0); private long lastPrintTime = 0; private long dataTime; + private boolean isRealTime = false; /** * extra map used when sending to dataproxy */ @@ -77,8 +79,12 @@ public class ProxyMessageCache { this.cacheTimeout = instanceProfile.getInt(PROXY_PACKAGE_MAX_TIMEOUT_MS, DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS); messageQueueMap = new ConcurrentHashMap<>(); try { - dataTime = DateTransUtils.timeStrConvertToMillSec(instanceProfile.getSourceDataTime(), - instanceProfile.get(TASK_CYCLE_UNIT)); + String cycleUnit = instanceProfile.get(TASK_CYCLE_UNIT); + if (cycleUnit.compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) { + isRealTime = true; + cycleUnit = CycleUnitType.HOUR; + } + dataTime = DateTransUtils.timeStrConvertToMillSec(instanceProfile.getSourceDataTime(), cycleUnit); } catch (ParseException e) { LOGGER.info("trans dataTime error", e); } @@ -172,9 +178,15 @@ public class ProxyMessageCache { offsetList.add(message.getAckInfo()); } // make sure result is not empty. + long auditTime = 0; + if (isRealTime) { + auditTime = AgentUtils.getCurrentTime(); + } else { + auditTime = dataTime; + } if (!bodyList.isEmpty()) { SenderMessage senderMessage = new SenderMessage(taskId, instanceId, groupId, streamId, bodyList, - dataTime, extraMap, offsetList); + auditTime, extraMap, offsetList); return senderMessage; } return null; diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java index 29fb5633f6..f97a89f26c 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java @@ -466,7 +466,7 @@ public class TaskManager extends AbstractDaemon { task.getTaskId(), taskMap.size(), runningPool.getTaskCount(), runningPool.getActiveCount()); } catch (Throwable t) { - LOGGER.error("add task error {}", t.getMessage()); + LOGGER.error("add task error: ", t); } } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java index 807c6fbcde..45fe9bc63e 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java @@ -345,7 +345,7 @@ public class SenderManager { message.getOffsetAckList().forEach(ack -> ack.setHasAck(true)); getMetricItem(groupId, streamId).pluginSendSuccessCount.addAndGet(msgCnt); AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, groupId, streamId, - profile.getSinkDataTime(), message.getMsgCnt(), message.getTotalSize()); + dataTime, message.getMsgCnt(), message.getTotalSize()); } else { LOGGER.warn("send groupId {}, streamId {}, taskId {}, instanceId {}, dataTime {} fail with times {}, " + "error {}", groupId, streamId, taskId, instanceId, dataTime, retry, result); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java index 481b7efcf9..4b61d32fc4 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java @@ -21,6 +21,7 @@ import org.apache.inlong.agent.common.AgentThreadFactory; import org.apache.inlong.agent.conf.InstanceProfile; import org.apache.inlong.agent.conf.OffsetProfile; import org.apache.inlong.agent.conf.TaskProfile; +import org.apache.inlong.agent.constant.CycleUnitType; import org.apache.inlong.agent.constant.DataCollectType; import org.apache.inlong.agent.constant.TaskConstants; import org.apache.inlong.agent.core.task.OffsetManager; @@ -138,6 +139,7 @@ public class LogFileSource extends AbstractSource { private long dataTime = 0; private volatile long emptyCount = 0; private ExtendedHandler extendedHandler; + private boolean isRealTime = false; public LogFileSource() { OffsetManager.init(); @@ -149,6 +151,11 @@ public class LogFileSource extends AbstractSource { LOGGER.info("LogFileSource init: {}", profile.toJsonStr()); this.profile = profile; super.init(profile); + String cycleUnit = profile.get(TASK_CYCLE_UNIT); + if (cycleUnit.compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) { + isRealTime = true; + cycleUnit = CycleUnitType.HOUR; + } taskId = profile.getTaskId(); instanceId = profile.getInstanceId(); fileName = profile.getInstanceId(); @@ -161,8 +168,7 @@ public class LogFileSource extends AbstractSource { linePosition = getInitLineOffset(isIncrement, taskId, instanceId, inodeInfo); bytePosition = getBytePositionByLine(linePosition); queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE); - dataTime = DateTransUtils.timeStrConvertToMillSec(profile.getSourceDataTime(), - profile.get(TASK_CYCLE_UNIT)); + dataTime = DateTransUtils.timeStrConvertToMillSec(profile.getSourceDataTime(), cycleUnit); if (DEFAULT_FILE_SOURCE_EXTEND_CLASS.compareTo(ExtendedHandler.class.getCanonicalName()) != 0) { Constructor<?> constructor = Class.forName( @@ -369,8 +375,14 @@ public class LogFileSource extends AbstractSource { if (extendedHandler != null) { extendedHandler.dealWithHeader(header, sourceData.getData().getBytes(StandardCharsets.UTF_8)); } + long auditTime = 0; + if (isRealTime) { + auditTime = AgentUtils.getCurrentTime(); + } else { + auditTime = profile.getSinkDataTime(); + } AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, header.get(PROXY_KEY_STREAM_ID), - profile.getSinkDataTime(), 1, msgWithMetaData.length()); + auditTime, 1, msgWithMetaData.length()); Message finalMsg = new DefaultMessage(msgWithMetaData.getBytes(StandardCharsets.UTF_8), header); // if the message size is greater than max pack size,should drop it. if (finalMsg.getBody().length > maxPackSize) { @@ -556,6 +568,9 @@ public class LogFileSource extends AbstractSource { @Override public boolean sourceFinish() { + if (isRealTime) { + return false; + } return emptyCount > EMPTY_CHECK_COUNT_AT_LEAST; }