This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 78c4a763f6 [INLONG-9284][Agent] Report audit by data time not real 
time (#9292)
78c4a763f6 is described below

commit 78c4a763f60c8a251b15317ad3569b633528b03c
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Wed Nov 15 14:17:08 2023 +0800

    [INLONG-9284][Agent] Report audit by data time not real time (#9292)
---
 .../message/filecollect/ProxyMessageCache.java     | 12 ++++-
 .../apache/inlong/agent/utils/DateTransUtils.java  |  1 -
 .../agent/core/instance/InstanceManager.java       |  7 ++-
 .../inlong/agent/core/task/file/TaskManager.java   |  3 +-
 .../plugin/sinks/filecollect/SenderManager.java    |  6 +--
 .../inlong/agent/plugin/sources/LogFileSource.java | 51 +++++++++++-----------
 .../agent/plugin/sources/TestLogFileSource.java    |  8 +---
 7 files changed, 46 insertions(+), 42 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
index 7ca74fb603..29ebcc75b0 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
@@ -21,11 +21,13 @@ import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.constant.TaskConstants;
 import org.apache.inlong.agent.message.ProxyMessage;
 import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.DateTransUtils;
 import org.apache.inlong.common.msg.AttributeConstants;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.text.ParseException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -40,6 +42,7 @@ import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PAC
 import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER;
 import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE;
 import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_TIMEOUT_MS;
+import static org.apache.inlong.agent.constant.TaskConstants.TASK_CYCLE_UNIT;
 
 /**
  * Handle List of Proxy Message, which belong to the same stream id.
@@ -62,6 +65,7 @@ public class ProxyMessageCache {
     private final AtomicLong cacheSize = new AtomicLong(0);
     private Long packageIndex = 0L;
     private long lastPrintTime = 0;
+    private long dataTime;
     /**
      * extra map used when sending to dataproxy
      */
@@ -78,6 +82,12 @@ public class ProxyMessageCache {
         this.groupId = groupId;
         this.streamId = streamId;
         this.inodeInfo = instanceProfile.get(TaskConstants.INODE_INFO);
+        try {
+            dataTime = 
DateTransUtils.timeStrConvertTomillSec(instanceProfile.getDataTime(),
+                    instanceProfile.get(TASK_CYCLE_UNIT));
+        } catch (ParseException e) {
+            LOGGER.info("trans dataTime error", e);
+        }
         extraMap.put(AttributeConstants.MESSAGE_SYNC_SEND, "false");
         
extraMap.putAll(AgentUtils.parseAddAttrToMap(instanceProfile.getPredefineFields()));
     }
@@ -163,7 +173,7 @@ public class ProxyMessageCache {
         if (!bodyList.isEmpty()) {
             PackageAckInfo ackInfo = new PackageAckInfo(packageIndex, 
packageOffset, resultBatchSize, false);
             SenderMessage senderMessage = new SenderMessage(taskId, 
instanceId, groupId, streamId, bodyList,
-                    AgentUtils.getCurrentTime(), extraMap, ackInfo);
+                    dataTime, extraMap, ackInfo);
             packageIndex++;
             return senderMessage;
         }
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java
index ced881c3f7..6a79222a2a 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java
@@ -44,7 +44,6 @@ public class DateTransUtils {
     public static long timeStrConvertTomillSec(String time, String cycleUnit, 
TimeZone timeZone)
             throws ParseException {
         long retTime = 0;
-        // SimpleDateFormat df=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
         SimpleDateFormat df = null;
         if (cycleUnit.equals("Y") && time.length() == 4) {
             df = new SimpleDateFormat("yyyy");
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index c7835cd355..a80ce8b53b 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -160,16 +160,15 @@ public class InstanceManager extends AbstractDaemon {
 
     private void printInstanceDetail() {
         if (AgentUtils.getCurrentTime() - lastPrintTime > 
CORE_THREAD_PRINT_TIME) {
-            LOGGER.info("instanceManager coreThread running! taskId {} action 
count {}", taskId,
-                    actionQueue.size());
             List<InstanceProfile> instances = instanceDb.getInstances(taskId);
             InstancePrintStat stat = new InstancePrintStat();
             for (int i = 0; i < instances.size(); i++) {
                 InstanceProfile instance = instances.get(i);
                 stat.stat(instance.getState());
             }
-            LOGGER.info("instanceManager coreThread running! taskId {} memory 
total {} db total {} db detail {} ",
-                    taskId, instanceMap.size(), instances.size(), stat);
+            LOGGER.info(
+                    "instanceManager running! taskId {} mem {} db total {} {} 
action count {}",
+                    taskId, instanceMap.size(), instances.size(), stat, 
actionQueue.size());
             lastPrintTime = AgentUtils.getCurrentTime();
         }
     }
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
index 6724247c77..29fb5633f6 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
@@ -203,8 +203,7 @@ public class TaskManager extends AbstractDaemon {
                 TaskProfile task = tasksInDb.get(i);
                 stat.stat(task.getState());
             }
-            LOGGER.info("taskManager coreThread running! memory total {} db 
total {} db detail {} ", taskMap.size(),
-                    tasksInDb.size(), stat);
+            LOGGER.info("taskManager running! mem {} db total {} {} ", 
taskMap.size(), tasksInDb.size(), stat);
             lastPrintTime = AgentUtils.getCurrentTime();
         }
     }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
index d13ef19963..bea53a1e9d 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
@@ -331,11 +331,11 @@ public class SenderManager {
     }
 
     private void asyncSendByMessageSender(SendMessageCallback cb,
-            List<byte[]> bodyList, String groupId, String streamId, long dt, 
String msgUUID,
+            List<byte[]> bodyList, String groupId, String streamId, long 
dataTime, String msgUUID,
             long timeout, TimeUnit timeUnit,
             Map<String, String> extraAttrMap, boolean isProxySend) throws 
ProxysdkException {
         sender.asyncSendMessage(cb, bodyList, groupId,
-                streamId, dt, msgUUID,
+                streamId, dataTime, msgUUID,
                 timeout, timeUnit, extraAttrMap, isProxySend);
     }
 
@@ -428,7 +428,7 @@ public class SenderManager {
                 message.getAckInfo().setHasAck(true);
                 getMetricItem(groupId, 
streamId).pluginSendSuccessCount.addAndGet(msgCnt);
                 AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, 
groupId, streamId,
-                        System.currentTimeMillis(), message.getMsgCnt(), 
message.getTotalSize());
+                        dataTime, message.getMsgCnt(), message.getTotalSize());
             } else {
                 LOGGER.warn("send groupId {}, streamId {}, taskId {}, 
instanceId {}, dataTime {} fail with times {}, "
                         + "error {}", groupId, streamId, taskId, instanceId, 
dataTime, retry, result);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index 089f0fee75..cbc1cff372 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -34,6 +34,7 @@ import 
org.apache.inlong.agent.plugin.sources.file.AbstractSource;
 import 
org.apache.inlong.agent.plugin.sources.reader.file.KubernetesMetadataProvider;
 import org.apache.inlong.agent.plugin.utils.file.FileDataUtils;
 import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.DateTransUtils;
 
 import com.google.gson.Gson;
 import lombok.AllArgsConstructor;
@@ -80,6 +81,7 @@ import static 
org.apache.inlong.agent.constant.MetadataConstants.METADATA_HOST_N
 import static 
org.apache.inlong.agent.constant.MetadataConstants.METADATA_SOURCE_IP;
 import static 
org.apache.inlong.agent.constant.TaskConstants.JOB_FILE_META_ENV_LIST;
 import static org.apache.inlong.agent.constant.TaskConstants.OFFSET;
+import static org.apache.inlong.agent.constant.TaskConstants.TASK_CYCLE_UNIT;
 
 /**
  * Read text files
@@ -106,9 +108,9 @@ public class LogFileSource extends AbstractSource {
     private final Integer CORE_THREAD_PRINT_INTERVAL_MS = 1000;
     private final Integer CACHE_QUEUE_SIZE = 10 * BATCH_READ_LINE_COUNT;
     private final Integer SIZE_OF_BUFFER_TO_READ_FILE = 64 * 1024;
-    private final Integer FINISH_READ_MAX_COUNT = 30;
+    private final Integer EMPTY_CHECK_COUNT_AT_LEAST = 30;
     private final Long INODE_UPDATE_INTERVAL_MS = 1000L;
-    private final Integer READ_WAIT_TIMEOUT_MS = 1000;
+    private final Integer READ_WAIT_TIMEOUT_MS = 10;
     private final SimpleDateFormat RECORD_TIME_FORMAT = new 
SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
     public InstanceProfile profile;
     private String taskId;
@@ -125,11 +127,12 @@ public class LogFileSource extends AbstractSource {
     private BlockingQueue<SourceData> queue;
     private final Gson GSON = new Gson();
     private volatile boolean runnable = true;
-    private volatile int readEndCount = 0;
     private volatile boolean fileExist = true;
     private String inodeInfo;
     private volatile long lastInodeUpdateTime = 0;
     private volatile boolean running = false;
+    private long dataTime = 0;
+    private volatile long emptyCount = 0;
 
     public LogFileSource() {
         OffsetManager.init();
@@ -153,6 +156,8 @@ public class LogFileSource extends AbstractSource {
             linePosition = getInitLineOffset(isIncrement, taskId, instanceId, 
inodeInfo);
             bytePosition = getBytePositionByLine(linePosition);
             queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
+            dataTime = 
DateTransUtils.timeStrConvertTomillSec(profile.getDataTime(),
+                    profile.get(TASK_CYCLE_UNIT));
             try {
                 registerMeta(profile);
             } catch (Exception ex) {
@@ -249,7 +254,7 @@ public class LogFileSource extends AbstractSource {
                 }
             }
         } catch (Exception e) {
-            LOGGER.error("getBytePositionByLine error {}", e.getStackTrace());
+            LOGGER.error("getBytePositionByLine error: ", e);
         } finally {
             if (input != null) {
                 input.close();
@@ -344,7 +349,7 @@ public class LogFileSource extends AbstractSource {
     private Message createMessage(SourceData sourceData) {
         String msgWithMetaData = fillMetaData(sourceData.data);
         AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, 
inlongStreamId,
-                System.currentTimeMillis(), 1, msgWithMetaData.length());
+                dataTime, 1, msgWithMetaData.length());
         String proxyPartitionKey = profile.get(PROXY_SEND_PARTITION_KEY, 
DigestUtils.md5Hex(inlongGroupId));
         Map<String, String> header = new HashMap<>();
         header.put(PROXY_KEY_DATA, proxyPartitionKey);
@@ -429,6 +434,17 @@ public class LogFileSource extends AbstractSource {
                 } catch (IOException e) {
                     LOGGER.error("readFromPos error {}", e.getMessage());
                 }
+                
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
BATCH_READ_LINE_TOTAL_LEN);
+                if (lines.isEmpty()) {
+                    if (queue.isEmpty()) {
+                        emptyCount++;
+                    } else {
+                        emptyCount = 0;
+                    }
+                    AgentUtils.silenceSleepInSeconds(1);
+                    continue;
+                }
+                emptyCount = 0;
                 for (int i = 0; i < lines.size(); i++) {
                     boolean suc4Queue = 
waitForPermit(AGENT_GLOBAL_READER_QUEUE_PERMIT, lines.get(i).data.length());
                     if (!suc4Queue) {
@@ -436,17 +452,10 @@ public class LogFileSource extends AbstractSource {
                     }
                     putIntoQueue(lines.get(i));
                 }
-                
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
BATCH_READ_LINE_TOTAL_LEN);
-                if (lines.isEmpty()) {
-                    readEndCount++;
-                    AgentUtils.silenceSleepInSeconds(1);
-                } else {
-                    readEndCount = 0;
-                    if (AgentUtils.getCurrentTime() - lastPrintTime > 
CORE_THREAD_PRINT_INTERVAL_MS) {
-                        lastPrintTime = AgentUtils.getCurrentTime();
-                        LOGGER.info("path is {}, linePosition {}, bytePosition 
is {} file len {}, reads lines size {}",
-                                file.getName(), linePosition, bytePosition, 
file.length(), lines.size());
-                    }
+                if (AgentUtils.getCurrentTime() - lastPrintTime > 
CORE_THREAD_PRINT_INTERVAL_MS) {
+                    lastPrintTime = AgentUtils.getCurrentTime();
+                    LOGGER.info("path is {}, linePosition {}, bytePosition is 
{} file len {}, reads lines size {}",
+                            file.getName(), linePosition, bytePosition, 
file.length(), lines.size());
                 }
             }
             running = false;
@@ -530,15 +539,7 @@ public class LogFileSource extends AbstractSource {
 
     @Override
     public boolean sourceFinish() {
-        if (finishReadLog() && queue.isEmpty()) {
-            return true;
-        } else {
-            return false;
-        }
-    }
-
-    public boolean finishReadLog() {
-        return readEndCount > FINISH_READ_MAX_COUNT;
+        return emptyCount > EMPTY_CHECK_COUNT_AT_LEAST;
     }
 
     @Override
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
index b2a3b605b7..67ce1dec76 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
@@ -73,7 +73,7 @@ public class TestLogFileSource {
             Whitebox.setInternalState(source, "BATCH_READ_LINE_TOTAL_LEN", 10);
             Whitebox.setInternalState(source, "CORE_THREAD_PRINT_INTERVAL_MS", 
0);
             Whitebox.setInternalState(source, "SIZE_OF_BUFFER_TO_READ_FILE", 
2);
-            Whitebox.setInternalState(source, "FINISH_READ_MAX_COUNT", 1);
+            Whitebox.setInternalState(source, "EMPTY_CHECK_COUNT_AT_LEAST", 3);
             Whitebox.setInternalState(source, "READ_WAIT_TIMEOUT_MS", 10);
             source.init(instanceProfile);
             return source;
@@ -101,10 +101,7 @@ public class TestLogFileSource {
             srcLen += check[i].getBytes(StandardCharsets.UTF_8).length;
         }
         LogFileSource source = getSource();
-        await().atMost(2, TimeUnit.SECONDS).until(() -> 
source.finishReadLog());
         int cnt = 0;
-        int leftBeforeRead = 
MemoryManager.getInstance().getLeft(AGENT_GLOBAL_READER_QUEUE_PERMIT);
-        Assert.assertTrue(leftBeforeRead + srcLen == 
DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT);
         Message msg = source.read();
         int readLen = 0;
         while (msg != null) {
@@ -114,7 +111,7 @@ public class TestLogFileSource {
             msg = source.read();
             cnt++;
         }
-        await().atMost(2, TimeUnit.SECONDS).until(() -> source.sourceFinish());
+        await().atMost(6, TimeUnit.SECONDS).until(() -> source.sourceFinish());
         source.destroy();
         Assert.assertTrue(cnt == 3);
         Assert.assertTrue(srcLen == readLen);
@@ -124,7 +121,6 @@ public class TestLogFileSource {
 
     private void testCleanQueue() {
         LogFileSource source = getSource();
-        await().atMost(2, TimeUnit.SECONDS).until(() -> 
source.finishReadLog());
         for (int i = 0; i < 2; i++) {
             source.read();
         }

Reply via email to