This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new cf663fcfec [INLONG-9338][Agent] Real time file collection uses the 
current time as the data time (#9339)
cf663fcfec is described below

commit cf663fcfece271616e9660f3fe2a215c78c6be0b
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Mon Nov 27 17:15:33 2023 +0800

    [INLONG-9338][Agent] Real time file collection uses the current time as the 
data time (#9339)
---
 .../inlong/agent/conf/AbstractConfiguration.java    |  2 ++
 .../message/filecollect/ProxyMessageCache.java      | 18 +++++++++++++++---
 .../inlong/agent/core/task/file/TaskManager.java    |  2 +-
 .../plugin/sinks/filecollect/SenderManager.java     |  2 +-
 .../inlong/agent/plugin/sources/LogFileSource.java  | 21 ++++++++++++++++++---
 5 files changed, 37 insertions(+), 8 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/AbstractConfiguration.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/AbstractConfiguration.java
index b6a8b19d73..ec7fcf1151 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/AbstractConfiguration.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/AbstractConfiguration.java
@@ -175,6 +175,7 @@ public abstract class AbstractConfiguration {
     public int getInt(String key) {
         JsonElement value = configStorage.get(key);
         if (value == null) {
+            LOGGER.error("null value for key " + key);
             throw new NullPointerException("null value for key " + key);
         }
         return value.getAsInt();
@@ -231,6 +232,7 @@ public abstract class AbstractConfiguration {
     public String get(String key) {
         JsonElement value = configStorage.get(key);
         if (value == null) {
+            LOGGER.error("null value for key " + key);
             throw new NullPointerException("null value for key " + key);
         }
         return value.getAsString();
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
index 66b3fb3b43..3c7742365e 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.agent.message.filecollect;
 
 import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.constant.CycleUnitType;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.agent.utils.DateTransUtils;
 import org.apache.inlong.common.msg.AttributeConstants;
@@ -62,6 +63,7 @@ public class ProxyMessageCache {
     private final AtomicLong cacheSize = new AtomicLong(0);
     private long lastPrintTime = 0;
     private long dataTime;
+    private boolean isRealTime = false;
     /**
      * extra map used when sending to dataproxy
      */
@@ -77,8 +79,12 @@ public class ProxyMessageCache {
         this.cacheTimeout = 
instanceProfile.getInt(PROXY_PACKAGE_MAX_TIMEOUT_MS, 
DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS);
         messageQueueMap = new ConcurrentHashMap<>();
         try {
-            dataTime = 
DateTransUtils.timeStrConvertToMillSec(instanceProfile.getSourceDataTime(),
-                    instanceProfile.get(TASK_CYCLE_UNIT));
+            String cycleUnit = instanceProfile.get(TASK_CYCLE_UNIT);
+            if (cycleUnit.compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) {
+                isRealTime = true;
+                cycleUnit = CycleUnitType.HOUR;
+            }
+            dataTime = 
DateTransUtils.timeStrConvertToMillSec(instanceProfile.getSourceDataTime(), 
cycleUnit);
         } catch (ParseException e) {
             LOGGER.info("trans dataTime error", e);
         }
@@ -172,9 +178,15 @@ public class ProxyMessageCache {
             offsetList.add(message.getAckInfo());
         }
         // make sure result is not empty.
+        long auditTime = 0;
+        if (isRealTime) {
+            auditTime = AgentUtils.getCurrentTime();
+        } else {
+            auditTime = dataTime;
+        }
         if (!bodyList.isEmpty()) {
             SenderMessage senderMessage = new SenderMessage(taskId, 
instanceId, groupId, streamId, bodyList,
-                    dataTime, extraMap, offsetList);
+                    auditTime, extraMap, offsetList);
             return senderMessage;
         }
         return null;
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
index 29fb5633f6..f97a89f26c 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
@@ -466,7 +466,7 @@ public class TaskManager extends AbstractDaemon {
                     task.getTaskId(), taskMap.size(), 
runningPool.getTaskCount(),
                     runningPool.getActiveCount());
         } catch (Throwable t) {
-            LOGGER.error("add task error {}", t.getMessage());
+            LOGGER.error("add task error: ", t);
         }
     }
 
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
index 807c6fbcde..45fe9bc63e 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
@@ -345,7 +345,7 @@ public class SenderManager {
                 message.getOffsetAckList().forEach(ack -> ack.setHasAck(true));
                 getMetricItem(groupId, 
streamId).pluginSendSuccessCount.addAndGet(msgCnt);
                 AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, 
groupId, streamId,
-                        profile.getSinkDataTime(), message.getMsgCnt(), 
message.getTotalSize());
+                        dataTime, message.getMsgCnt(), message.getTotalSize());
             } else {
                 LOGGER.warn("send groupId {}, streamId {}, taskId {}, 
instanceId {}, dataTime {} fail with times {}, "
                         + "error {}", groupId, streamId, taskId, instanceId, 
dataTime, retry, result);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index 481b7efcf9..4b61d32fc4 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -21,6 +21,7 @@ import org.apache.inlong.agent.common.AgentThreadFactory;
 import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.conf.OffsetProfile;
 import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.CycleUnitType;
 import org.apache.inlong.agent.constant.DataCollectType;
 import org.apache.inlong.agent.constant.TaskConstants;
 import org.apache.inlong.agent.core.task.OffsetManager;
@@ -138,6 +139,7 @@ public class LogFileSource extends AbstractSource {
     private long dataTime = 0;
     private volatile long emptyCount = 0;
     private ExtendedHandler extendedHandler;
+    private boolean isRealTime = false;
 
     public LogFileSource() {
         OffsetManager.init();
@@ -149,6 +151,11 @@ public class LogFileSource extends AbstractSource {
             LOGGER.info("LogFileSource init: {}", profile.toJsonStr());
             this.profile = profile;
             super.init(profile);
+            String cycleUnit = profile.get(TASK_CYCLE_UNIT);
+            if (cycleUnit.compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) {
+                isRealTime = true;
+                cycleUnit = CycleUnitType.HOUR;
+            }
             taskId = profile.getTaskId();
             instanceId = profile.getInstanceId();
             fileName = profile.getInstanceId();
@@ -161,8 +168,7 @@ public class LogFileSource extends AbstractSource {
             linePosition = getInitLineOffset(isIncrement, taskId, instanceId, 
inodeInfo);
             bytePosition = getBytePositionByLine(linePosition);
             queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
-            dataTime = 
DateTransUtils.timeStrConvertToMillSec(profile.getSourceDataTime(),
-                    profile.get(TASK_CYCLE_UNIT));
+            dataTime = 
DateTransUtils.timeStrConvertToMillSec(profile.getSourceDataTime(), cycleUnit);
             if 
(DEFAULT_FILE_SOURCE_EXTEND_CLASS.compareTo(ExtendedHandler.class.getCanonicalName())
 != 0) {
                 Constructor<?> constructor =
                         Class.forName(
@@ -369,8 +375,14 @@ public class LogFileSource extends AbstractSource {
         if (extendedHandler != null) {
             extendedHandler.dealWithHeader(header, 
sourceData.getData().getBytes(StandardCharsets.UTF_8));
         }
+        long auditTime = 0;
+        if (isRealTime) {
+            auditTime = AgentUtils.getCurrentTime();
+        } else {
+            auditTime = profile.getSinkDataTime();
+        }
         AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, 
header.get(PROXY_KEY_STREAM_ID),
-                profile.getSinkDataTime(), 1, msgWithMetaData.length());
+                auditTime, 1, msgWithMetaData.length());
         Message finalMsg = new 
DefaultMessage(msgWithMetaData.getBytes(StandardCharsets.UTF_8), header);
         // if the message size is greater than max pack size,should drop it.
         if (finalMsg.getBody().length > maxPackSize) {
@@ -556,6 +568,9 @@ public class LogFileSource extends AbstractSource {
 
     @Override
     public boolean sourceFinish() {
+        if (isRealTime) {
+            return false;
+        }
         return emptyCount > EMPTY_CHECK_COUNT_AT_LEAST;
     }
 

Reply via email to