This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 13a34c8a87 [INLONG-9253][Agent] Fix bug: get byte postion of file by 
line count offset failed (#9254)
13a34c8a87 is described below

commit 13a34c8a877da459431f65782c91133398459d37
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Fri Nov 10 15:28:52 2023 +0800

    [INLONG-9253][Agent] Fix bug: get byte postion of file by line count offset 
failed (#9254)
---
 .../inlong/agent/core/instance/MockInstance.java   | 19 ++++++++++-----
 .../agent/core/instance/TestInstanceManager.java   |  4 ++--
 .../inlong/agent/plugin/sources/LogFileSource.java | 27 +++++++++++-----------
 .../agent/plugin/sources/TestLogFileSource.java    |  2 +-
 4 files changed, 30 insertions(+), 22 deletions(-)

diff --git 
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/MockInstance.java
 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/MockInstance.java
index ada8cefcdd..5e9bbbab03 100644
--- 
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/MockInstance.java
+++ 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/MockInstance.java
@@ -20,13 +20,17 @@ package org.apache.inlong.agent.core.instance;
 import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.plugin.Instance;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicLong;
+
 public class MockInstance extends Instance {
 
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(MockInstance.class);
     public static final int INIT_TIME = 100;
-    public static final int RUN_TIME = 101;
-    public static final int DESTROY_TIME = 102;
     private InstanceProfile profile;
-    private long index = INIT_TIME;
+    private AtomicLong index = new AtomicLong(INIT_TIME);
     public long initTime = 0;
     public long destroyTime = 0;
     public long runtime = 0;
@@ -36,12 +40,14 @@ public class MockInstance extends Instance {
     public void init(Object instanceManager, InstanceProfile profile) {
         this.instanceManager = (InstanceManager) instanceManager;
         this.profile = profile;
-        initTime = index++;
+        LOGGER.info("init called " + index);
+        initTime = index.getAndAdd(1);
     }
 
     @Override
     public void destroy() {
-        destroyTime = index++;
+        LOGGER.info("destroy called " + index);
+        destroyTime = index.getAndAdd(1);
     }
 
     @Override
@@ -66,7 +72,8 @@ public class MockInstance extends Instance {
 
     @Override
     public void run() {
-        runtime = index++;
+        LOGGER.info("run called " + index);
+        runtime = index.getAndAdd(1);
     }
 
     public void sendFinishAction() {
diff --git 
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
index cff9a7c243..aae1c15da9 100755
--- 
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
+++ 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
@@ -100,7 +100,7 @@ public class TestInstanceManager {
         manager.submitAction(action);
         await().atMost(1, TimeUnit.SECONDS).until(() -> 
manager.getInstanceProfile(instanceId) == null);
         Assert.assertTrue(String.valueOf(instance.initTime), instance.initTime 
== MockInstance.INIT_TIME);
-        Assert.assertTrue(String.valueOf(instance.runtime), instance.runtime 
== MockInstance.RUN_TIME);
-        Assert.assertTrue(String.valueOf(instance.destroyTime), 
instance.destroyTime == MockInstance.DESTROY_TIME);
+        Assert.assertTrue(instance.runtime > instance.initTime);
+        Assert.assertTrue(instance.destroyTime > instance.runtime);
     }
 }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index ab96b6a8e8..7b94e3f068 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -103,7 +103,7 @@ public class LogFileSource extends AbstractSource {
             new AgentThreadFactory("log-file-source"));
     private final Integer BATCH_READ_LINE_COUNT = 10000;
     private final Integer BATCH_READ_LINE_TOTAL_LEN = 1024 * 1024;
-    private final Integer PRINT_INTERVAL_MS = 1000;
+    private final Integer CORE_THREAD_PRINT_INTERVAL_MS = 1000;
     private final Integer CACHE_QUEUE_SIZE = 10 * BATCH_READ_LINE_COUNT;
     private final Integer SIZE_OF_BUFFER_TO_READ_FILE = 64 * 1024;
     private final Integer FINISH_READ_MAX_COUNT = 30;
@@ -117,8 +117,8 @@ public class LogFileSource extends AbstractSource {
     private String fileName;
     private File file;
     private byte[] bufferToReadFile;
-    public long linePosition = 0;
-    public long bytePosition = 0;
+    public volatile long linePosition = 0;
+    public volatile long bytePosition = 0;
     private boolean needMetadata = false;
     public Map<String, String> metadata;
     private boolean isIncrement = false;
@@ -128,7 +128,7 @@ public class LogFileSource extends AbstractSource {
     private volatile int readEndCount = 0;
     private volatile boolean fileExist = true;
     private String inodeInfo;
-    private long lastInodeUpdateTime = 0;
+    private volatile long lastInodeUpdateTime = 0;
     private volatile boolean running = false;
 
     public LogFileSource() {
@@ -144,20 +144,20 @@ public class LogFileSource extends AbstractSource {
             instanceId = profile.getInstanceId();
             fileName = profile.getInstanceId();
             maxPackSize = profile.getInt(PROXY_PACKAGE_MAX_SIZE, 
DEFAULT_PROXY_PACKAGE_MAX_SIZE);
+            bufferToReadFile = new byte[SIZE_OF_BUFFER_TO_READ_FILE];
             isIncrement = isIncrement(profile);
             file = new File(fileName);
             inodeInfo = profile.get(TaskConstants.INODE_INFO);
             lastInodeUpdateTime = AgentUtils.getCurrentTime();
             linePosition = getInitLineOffset(isIncrement, taskId, instanceId, 
inodeInfo);
             bytePosition = getBytePositionByLine(linePosition);
-            bufferToReadFile = new byte[SIZE_OF_BUFFER_TO_READ_FILE];
             queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
             try {
                 registerMeta(profile);
             } catch (Exception ex) {
                 LOGGER.error("init metadata error", ex);
             }
-            EXECUTOR_SERVICE.execute(run());
+            EXECUTOR_SERVICE.execute(coreThread());
         } catch (Exception ex) {
             stopRunning();
             throw new FileException("error init stream for " + file.getPath(), 
ex);
@@ -181,11 +181,12 @@ public class LogFileSource extends AbstractSource {
         if (offsetProfile != null && 
offsetProfile.getInodeInfo().compareTo(inodeInfo) == 0) {
             offset = offsetProfile.getOffset();
             if (fileLineCount < offset) {
-                LOGGER.info("getInitLineOffset taskId {} file rotate, offset 
set to 0, file {}", taskId,
+                LOGGER.info("getInitLineOffset inode no change taskId {} file 
rotate, offset set to 0, file {}", taskId,
                         fileName);
                 offset = 0;
             } else {
-                LOGGER.info("getInitLineOffset taskId {} from db {}, file {}", 
taskId, offset, fileName);
+                LOGGER.info("getInitLineOffset inode no change taskId {} from 
db {}, file {}", taskId, offset,
+                        fileName);
             }
         } else {
             if (isIncrement) {
@@ -247,7 +248,7 @@ public class LogFileSource extends AbstractSource {
                 }
             }
         } catch (Exception e) {
-            LOGGER.error("getBytePositionByLine error {}", e.getMessage());
+            LOGGER.error("getBytePositionByLine error {}", e.getStackTrace());
         } finally {
             if (input != null) {
                 input.close();
@@ -397,7 +398,7 @@ public class LogFileSource extends AbstractSource {
         return false;
     }
 
-    public Runnable run() {
+    public Runnable coreThread() {
         return () -> {
             AgentThreadFactory.nameThread("log-file-source-" + taskId + "-" + 
file);
             running = true;
@@ -441,10 +442,10 @@ public class LogFileSource extends AbstractSource {
                     AgentUtils.silenceSleepInSeconds(1);
                 } else {
                     readEndCount = 0;
-                    if (AgentUtils.getCurrentTime() - lastPrintTime > 
PRINT_INTERVAL_MS) {
+                    if (AgentUtils.getCurrentTime() - lastPrintTime > 
CORE_THREAD_PRINT_INTERVAL_MS) {
                         lastPrintTime = AgentUtils.getCurrentTime();
-                        LOGGER.info("path is {}, linePosition {}, bytePosition 
is {}, reads lines size {}",
-                                file.getName(), linePosition, bytePosition, 
lines.size());
+                        LOGGER.info("path is {}, linePosition {}, bytePosition 
is {} file len {}, reads lines size {}",
+                                file.getName(), linePosition, bytePosition, 
file.length(), lines.size());
                     }
                 }
             }
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
index 5617976077..eb9c8ccf89 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
@@ -71,7 +71,7 @@ public class TestLogFileSource {
             LogFileSource source = new LogFileSource();
             Whitebox.setInternalState(source, "BATCH_READ_LINE_COUNT", 1);
             Whitebox.setInternalState(source, "BATCH_READ_LINE_TOTAL_LEN", 10);
-            Whitebox.setInternalState(source, "PRINT_INTERVAL_MS", 0);
+            Whitebox.setInternalState(source, "CORE_THREAD_PRINT_INTERVAL_MS", 
0);
             Whitebox.setInternalState(source, "SIZE_OF_BUFFER_TO_READ_FILE", 
2);
             Whitebox.setInternalState(source, "FINISH_READ_MAX_COUNT", 1);
             Whitebox.setInternalState(source, "READ_WAIT_TIMEOUT_MS", 10);

Reply via email to