This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 13a34c8a87 [INLONG-9253][Agent] Fix bug: get byte postion of file by line count offset failed (#9254) 13a34c8a87 is described below commit 13a34c8a877da459431f65782c91133398459d37 Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Fri Nov 10 15:28:52 2023 +0800 [INLONG-9253][Agent] Fix bug: get byte postion of file by line count offset failed (#9254) --- .../inlong/agent/core/instance/MockInstance.java | 19 ++++++++++----- .../agent/core/instance/TestInstanceManager.java | 4 ++-- .../inlong/agent/plugin/sources/LogFileSource.java | 27 +++++++++++----------- .../agent/plugin/sources/TestLogFileSource.java | 2 +- 4 files changed, 30 insertions(+), 22 deletions(-) diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/MockInstance.java b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/MockInstance.java index ada8cefcdd..5e9bbbab03 100644 --- a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/MockInstance.java +++ b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/MockInstance.java @@ -20,13 +20,17 @@ package org.apache.inlong.agent.core.instance; import org.apache.inlong.agent.conf.InstanceProfile; import org.apache.inlong.agent.plugin.Instance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicLong; + public class MockInstance extends Instance { + private static final Logger LOGGER = LoggerFactory.getLogger(MockInstance.class); public static final int INIT_TIME = 100; - public static final int RUN_TIME = 101; - public static final int DESTROY_TIME = 102; private InstanceProfile profile; - private long index = INIT_TIME; + private AtomicLong index = new AtomicLong(INIT_TIME); public long initTime = 0; public long destroyTime = 0; public long runtime = 0; @@ -36,12 +40,14 @@ public class MockInstance extends Instance { public void init(Object instanceManager, InstanceProfile profile) { this.instanceManager = (InstanceManager) instanceManager; this.profile = profile; - initTime = index++; + LOGGER.info("init called " + index); + initTime = index.getAndAdd(1); } @Override public void destroy() { - destroyTime = index++; + LOGGER.info("destroy called " + index); + destroyTime = index.getAndAdd(1); } @Override @@ -66,7 +72,8 @@ public class MockInstance extends Instance { @Override public void run() { - runtime = index++; + LOGGER.info("run called " + index); + runtime = index.getAndAdd(1); } public void sendFinishAction() { diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java index cff9a7c243..aae1c15da9 100755 --- a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java +++ b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java @@ -100,7 +100,7 @@ public class TestInstanceManager { manager.submitAction(action); await().atMost(1, TimeUnit.SECONDS).until(() -> manager.getInstanceProfile(instanceId) == null); Assert.assertTrue(String.valueOf(instance.initTime), instance.initTime == MockInstance.INIT_TIME); - Assert.assertTrue(String.valueOf(instance.runtime), instance.runtime == MockInstance.RUN_TIME); - Assert.assertTrue(String.valueOf(instance.destroyTime), instance.destroyTime == MockInstance.DESTROY_TIME); + Assert.assertTrue(instance.runtime > instance.initTime); + Assert.assertTrue(instance.destroyTime > instance.runtime); } } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java index ab96b6a8e8..7b94e3f068 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java @@ -103,7 +103,7 @@ public class LogFileSource extends AbstractSource { new AgentThreadFactory("log-file-source")); private final Integer BATCH_READ_LINE_COUNT = 10000; private final Integer BATCH_READ_LINE_TOTAL_LEN = 1024 * 1024; - private final Integer PRINT_INTERVAL_MS = 1000; + private final Integer CORE_THREAD_PRINT_INTERVAL_MS = 1000; private final Integer CACHE_QUEUE_SIZE = 10 * BATCH_READ_LINE_COUNT; private final Integer SIZE_OF_BUFFER_TO_READ_FILE = 64 * 1024; private final Integer FINISH_READ_MAX_COUNT = 30; @@ -117,8 +117,8 @@ public class LogFileSource extends AbstractSource { private String fileName; private File file; private byte[] bufferToReadFile; - public long linePosition = 0; - public long bytePosition = 0; + public volatile long linePosition = 0; + public volatile long bytePosition = 0; private boolean needMetadata = false; public Map<String, String> metadata; private boolean isIncrement = false; @@ -128,7 +128,7 @@ public class LogFileSource extends AbstractSource { private volatile int readEndCount = 0; private volatile boolean fileExist = true; private String inodeInfo; - private long lastInodeUpdateTime = 0; + private volatile long lastInodeUpdateTime = 0; private volatile boolean running = false; public LogFileSource() { @@ -144,20 +144,20 @@ public class LogFileSource extends AbstractSource { instanceId = profile.getInstanceId(); fileName = profile.getInstanceId(); maxPackSize = profile.getInt(PROXY_PACKAGE_MAX_SIZE, DEFAULT_PROXY_PACKAGE_MAX_SIZE); + bufferToReadFile = new byte[SIZE_OF_BUFFER_TO_READ_FILE]; isIncrement = isIncrement(profile); file = new File(fileName); inodeInfo = profile.get(TaskConstants.INODE_INFO); lastInodeUpdateTime = AgentUtils.getCurrentTime(); linePosition = getInitLineOffset(isIncrement, taskId, instanceId, inodeInfo); bytePosition = getBytePositionByLine(linePosition); - bufferToReadFile = new byte[SIZE_OF_BUFFER_TO_READ_FILE]; queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE); try { registerMeta(profile); } catch (Exception ex) { LOGGER.error("init metadata error", ex); } - EXECUTOR_SERVICE.execute(run()); + EXECUTOR_SERVICE.execute(coreThread()); } catch (Exception ex) { stopRunning(); throw new FileException("error init stream for " + file.getPath(), ex); @@ -181,11 +181,12 @@ public class LogFileSource extends AbstractSource { if (offsetProfile != null && offsetProfile.getInodeInfo().compareTo(inodeInfo) == 0) { offset = offsetProfile.getOffset(); if (fileLineCount < offset) { - LOGGER.info("getInitLineOffset taskId {} file rotate, offset set to 0, file {}", taskId, + LOGGER.info("getInitLineOffset inode no change taskId {} file rotate, offset set to 0, file {}", taskId, fileName); offset = 0; } else { - LOGGER.info("getInitLineOffset taskId {} from db {}, file {}", taskId, offset, fileName); + LOGGER.info("getInitLineOffset inode no change taskId {} from db {}, file {}", taskId, offset, + fileName); } } else { if (isIncrement) { @@ -247,7 +248,7 @@ public class LogFileSource extends AbstractSource { } } } catch (Exception e) { - LOGGER.error("getBytePositionByLine error {}", e.getMessage()); + LOGGER.error("getBytePositionByLine error {}", e.getStackTrace()); } finally { if (input != null) { input.close(); @@ -397,7 +398,7 @@ public class LogFileSource extends AbstractSource { return false; } - public Runnable run() { + public Runnable coreThread() { return () -> { AgentThreadFactory.nameThread("log-file-source-" + taskId + "-" + file); running = true; @@ -441,10 +442,10 @@ public class LogFileSource extends AbstractSource { AgentUtils.silenceSleepInSeconds(1); } else { readEndCount = 0; - if (AgentUtils.getCurrentTime() - lastPrintTime > PRINT_INTERVAL_MS) { + if (AgentUtils.getCurrentTime() - lastPrintTime > CORE_THREAD_PRINT_INTERVAL_MS) { lastPrintTime = AgentUtils.getCurrentTime(); - LOGGER.info("path is {}, linePosition {}, bytePosition is {}, reads lines size {}", - file.getName(), linePosition, bytePosition, lines.size()); + LOGGER.info("path is {}, linePosition {}, bytePosition is {} file len {}, reads lines size {}", + file.getName(), linePosition, bytePosition, file.length(), lines.size()); } } } diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java index 5617976077..eb9c8ccf89 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java @@ -71,7 +71,7 @@ public class TestLogFileSource { LogFileSource source = new LogFileSource(); Whitebox.setInternalState(source, "BATCH_READ_LINE_COUNT", 1); Whitebox.setInternalState(source, "BATCH_READ_LINE_TOTAL_LEN", 10); - Whitebox.setInternalState(source, "PRINT_INTERVAL_MS", 0); + Whitebox.setInternalState(source, "CORE_THREAD_PRINT_INTERVAL_MS", 0); Whitebox.setInternalState(source, "SIZE_OF_BUFFER_TO_READ_FILE", 2); Whitebox.setInternalState(source, "FINISH_READ_MAX_COUNT", 1); Whitebox.setInternalState(source, "READ_WAIT_TIMEOUT_MS", 10);