This is an automated email from the ASF dual-hosted git repository.

wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f533e0b3d [INLONG-11527][Agent] Save both row and byte position 
information when saving offset (#11528)
4f533e0b3d is described below

commit 4f533e0b3d819651bd1733ea7572afb793bf0f78
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Thu Nov 21 21:04:18 2024 +0800

    [INLONG-11527][Agent] Save both row and byte position information when 
saving offset (#11528)
---
 .../inlong/agent/plugin/sources/LogFileSource.java | 101 +++++++++++++--------
 .../agent/plugin/sources/TestLogFileSource.java    |  36 +++++---
 2 files changed, 88 insertions(+), 49 deletions(-)

diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index 5aebbc7d86..72b0cd704a 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -31,6 +31,9 @@ import 
org.apache.inlong.agent.plugin.sources.file.extend.DefaultExtendedHandler
 import org.apache.inlong.agent.plugin.task.file.FileDataUtils;
 import org.apache.inlong.agent.utils.AgentUtils;
 
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,7 +55,20 @@ import static 
org.apache.inlong.agent.constant.TaskConstants.SOURCE_DATA_CONTENT
  */
 public class LogFileSource extends AbstractSource {
 
+    public static final int LEN_OF_FILE_OFFSET_ARRAY = 2;
+
+    @Data
+    @AllArgsConstructor
+    @NoArgsConstructor
+    protected class FileOffset {
+
+        private Long lineOffset;
+        private Long byteOffset;
+        private boolean hasByteOffset;
+    }
+
     private static final Logger LOGGER = 
LoggerFactory.getLogger(LogFileSource.class);
+    public static final String OFFSET_SEP = ":";
     private final Integer SIZE_OF_BUFFER_TO_READ_FILE = 64 * 1024;
     private final Long INODE_UPDATE_INTERVAL_MS = 1000L;
     private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss"); // 设置格式
@@ -86,8 +102,7 @@ public class LogFileSource extends AbstractSource {
             file = new File(fileName);
             inodeInfo = profile.get(TaskConstants.INODE_INFO);
             lastInodeUpdateTime = AgentUtils.getCurrentTime();
-            linePosition = getInitLineOffset(isIncrement, taskId, instanceId, 
inodeInfo);
-            bytePosition = getBytePositionByLine(linePosition);
+            initOffset(isIncrement, taskId, instanceId, inodeInfo);
             randomAccessFile = new RandomAccessFile(file, "r");
         } catch (Exception ex) {
             stopRunning();
@@ -137,14 +152,9 @@ public class LogFileSource extends AbstractSource {
     }
 
     private List<SourceData> readFromPos(long pos) throws IOException {
-        List<byte[]> lines = new ArrayList<>();
-        List<SourceData> dataList = new ArrayList<>();
-        bytePosition = readLines(randomAccessFile, pos, lines, 
BATCH_READ_LINE_COUNT, BATCH_READ_LINE_TOTAL_LEN, false);
-        for (int i = 0; i < lines.size(); i++) {
-            linePosition++;
-            dataList.add(new SourceData(lines.get(i), 
Long.toString(linePosition)));
-        }
-        return dataList;
+        List<SourceData> lines = new ArrayList<>();
+        bytePosition = readLines(randomAccessFile, pos, lines, 
BATCH_READ_LINE_COUNT, BATCH_READ_LINE_TOTAL_LEN);
+        return lines;
     }
 
     private int getRealLineCount(String fileName) {
@@ -157,30 +167,39 @@ public class LogFileSource extends AbstractSource {
         }
     }
 
-    private long getInitLineOffset(boolean isIncrement, String taskId, String 
instanceId, String inodeInfo) {
-        long offset = 0;
+    private void initOffset(boolean isIncrement, String taskId, String 
instanceId, String inodeInfo)
+            throws IOException {
+        long lineOffset;
+        long byteOffset;
         if (offsetProfile != null && 
offsetProfile.getInodeInfo().compareTo(inodeInfo) == 0) {
-            offset = Long.parseLong(offsetProfile.getOffset());
-            int fileLineCount = getRealLineCount(instanceId);
-            if (fileLineCount < offset) {
-                LOGGER.info("getInitLineOffset inode no change taskId {} file 
rotate, offset set to 0, file {}", taskId,
-                        fileName);
-                offset = 0;
+            FileOffset fileOffset = parseFIleOffset(offsetProfile.getOffset());
+            if (fileOffset.hasByteOffset) {
+                lineOffset = fileOffset.lineOffset;
+                byteOffset = fileOffset.byteOffset;
+                LOGGER.info("initOffset inode no change taskId {} restore 
lineOffset {} byteOffset {}, file {}", taskId,
+                        lineOffset, byteOffset, fileName);
             } else {
-                LOGGER.info("getInitLineOffset inode no change taskId {} from 
offset store {}, file {}", taskId, offset,
-                        fileName);
+                lineOffset = fileOffset.lineOffset;
+                byteOffset = getBytePositionByLine(lineOffset);
+                LOGGER.info("initOffset inode no change taskId {} restore 
lineOffset {} count byteOffset {}, file {}",
+                        taskId,
+                        lineOffset, byteOffset, fileName);
             }
         } else {
             if (isIncrement) {
-                offset = getRealLineCount(instanceId);
-                LOGGER.info("getInitLineOffset taskId {} for new increment 
read from {} file {}", taskId,
-                        offset, fileName);
+                lineOffset = getRealLineCount(instanceId);
+                byteOffset = getBytePositionByLine(lineOffset);
+                LOGGER.info("initOffset taskId {} for new increment lineOffset 
{} byteOffset {}, file {}", taskId,
+                        lineOffset, byteOffset, fileName);
             } else {
-                offset = 0;
-                LOGGER.info("getInitLineOffset taskId {} for new all read from 
0 file {}", taskId, fileName);
+                lineOffset = 0;
+                byteOffset = 0;
+                LOGGER.info("initOffset taskId {} for new all read lineOffset 
{} byteOffset {} file {}", taskId,
+                        lineOffset, byteOffset, fileName);
             }
         }
-        return offset;
+        linePosition = lineOffset;
+        bytePosition = byteOffset;
     }
 
     public File getFile() {
@@ -202,9 +221,9 @@ public class LogFileSource extends AbstractSource {
         try {
             input = new RandomAccessFile(file, "r");
             while (readCount < linePosition) {
-                List<byte[]> lines = new ArrayList<>();
+                List<SourceData> lines = new ArrayList<>();
                 pos = readLines(input, pos, lines, Math.min((int) 
(linePosition - readCount), BATCH_READ_LINE_COUNT),
-                        BATCH_READ_LINE_TOTAL_LEN, true);
+                        BATCH_READ_LINE_TOTAL_LEN);
                 readCount += lines.size();
                 if (lines.size() == 0) {
                     LOGGER.error("getBytePositionByLine LineNum {} larger than 
the real file");
@@ -229,8 +248,8 @@ public class LogFileSource extends AbstractSource {
      * @return The new position after the lines have been read
      * @throws IOException if an I/O error occurs.
      */
-    private long readLines(RandomAccessFile reader, long pos, List<byte[]> 
lines, int maxLineCount, int maxLineTotalLen,
-            boolean isCounting)
+    private long readLines(RandomAccessFile reader, long pos, List<SourceData> 
lines, int maxLineCount,
+            int maxLineTotalLen)
             throws IOException {
         if (maxLineCount == 0) {
             return pos;
@@ -248,13 +267,10 @@ public class LogFileSource extends AbstractSource {
                 byte ch = bufferToReadFile[i];
                 switch (ch) {
                     case '\n':
-                        if (isCounting) {
-                            lines.add(null);
-                        } else {
-                            lines.add(baos.toByteArray());
-                            lineTotalLen += baos.size();
-                        }
+                        linePosition++;
                         rePos = pos + i + 1;
+                        lines.add(new SourceData(baos.toByteArray(), 
getOffsetString(linePosition, rePos)));
+                        lineTotalLen += baos.size();
                         if (overLen) {
                             LOGGER.warn("readLines over len finally string len 
{}",
                                     new String(baos.toByteArray()).length());
@@ -297,6 +313,19 @@ public class LogFileSource extends AbstractSource {
         return rePos;
     }
 
+    private String getOffsetString(Long lineOffset, Long byteOffset) {
+        return lineOffset + OFFSET_SEP + byteOffset;
+    }
+
+    private FileOffset parseFIleOffset(String offset) {
+        String[] offsetArray = offset.split(OFFSET_SEP);
+        if (offsetArray.length == LEN_OF_FILE_OFFSET_ARRAY) {
+            return new FileOffset(Long.parseLong(offsetArray[0]), 
Long.parseLong(offsetArray[1]), true);
+        } else {
+            return new FileOffset(Long.parseLong(offsetArray[0]), null, false);
+        }
+    }
+
     private boolean isInodeChanged() {
         if (AgentUtils.getCurrentTime() - lastInodeUpdateTime > 
INODE_UPDATE_INTERVAL_MS) {
             try {
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
index 5d6871fecb..408b9f1b70 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
@@ -42,6 +42,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT;
@@ -74,13 +75,18 @@ public class TestLogFileSource {
         OffsetManager.init(taskBasicStore, instanceBasicStore, 
offsetBasicStore);
     }
 
-    private LogFileSource getSource(int taskId, long offset) {
+    private LogFileSource getSource(int taskId, long lineOffset, long 
byteOffset, String dataContentStyle,
+            boolean isOffSetNew) {
         try {
-            String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
-            TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern, 
"csv", false, "", "",
+            String pattern;
+            String fileName;
+            boolean retry;
+            fileName = LOADER.getResource("test/20230928_1.txt").getPath();
+            pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
+            retry = false;
+            TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern, 
dataContentStyle, retry, "", "",
                     TaskStateEnum.RUNNING, "D",
-                    "GMT+8:00", null);
-            String fileName = 
LOADER.getResource("test/20230928_1.txt").getPath();
+                    "GMT+8:00", Arrays.asList("ok"));
             InstanceProfile instanceProfile = 
taskProfile.createInstanceProfile("",
                     fileName, taskProfile.getCycleUnit(), "20230928", 
AgentUtils.getCurrentTime());
             instanceProfile.set(TaskConstants.INODE_INFO, 
FileDataUtils.getInodeInfo(instanceProfile.getInstanceId()));
@@ -91,17 +97,21 @@ public class TestLogFileSource {
             Whitebox.setInternalState(source, "SIZE_OF_BUFFER_TO_READ_FILE", 
2);
             Whitebox.setInternalState(source, "EMPTY_CHECK_COUNT_AT_LEAST", 3);
             Whitebox.setInternalState(source, "WAIT_TIMEOUT_MS", 10);
-            if (offset > 0) {
+            if (lineOffset > 0) {
+                String finalOffset = Long.toString(lineOffset);
+                if (isOffSetNew) {
+                    finalOffset += LogFileSource.OFFSET_SEP + byteOffset;
+                }
                 OffsetProfile offsetProfile = new 
OffsetProfile(instanceProfile.getTaskId(),
                         instanceProfile.getInstanceId(),
-                        Long.toString(offset), 
instanceProfile.get(INODE_INFO));
+                        finalOffset, instanceProfile.get(INODE_INFO));
                 OffsetManager.getInstance().setOffset(offsetProfile);
             }
             source.init(instanceProfile);
             source.start();
             return source;
         } catch (Exception e) {
-            LOGGER.error("source init error {}", e);
+            LOGGER.error("source init error", e);
             Assert.assertTrue("source init error", false);
         }
         return null;
@@ -124,7 +134,7 @@ public class TestLogFileSource {
         for (int i = 0; i < check.length; i++) {
             srcLen += check[i].getBytes(StandardCharsets.UTF_8).length;
         }
-        LogFileSource source = getSource(1, 0);
+        LogFileSource source = getSource(1, 0, 0, "csv", false);
         Message msg = source.read();
         int readLen = 0;
         int cnt = 0;
@@ -149,7 +159,7 @@ public class TestLogFileSource {
     }
 
     private void testCleanQueue() {
-        LogFileSource source = getSource(2, 0);
+        LogFileSource source = getSource(2, 0, 0, "csv", false);
         for (int i = 0; i < 2; i++) {
             source.read();
         }
@@ -160,16 +170,16 @@ public class TestLogFileSource {
     }
 
     private void testReadWithOffset() {
-        LogFileSource source = getSource(3, 1);
+        LogFileSource source = getSource(3, 1, 25, "csv", false);
         for (int i = 0; i < 2; i++) {
             Message msg = source.read();
-            Assert.assertTrue(msg != null);
+            Assert.assertEquals(new String(msg.getBody()), check[i + 1]);
         }
         Message msg = source.read();
         Assert.assertTrue(msg == null);
         source.destroy();
 
-        source = getSource(4, 3);
+        source = getSource(4, 3, 69, "csv", false);
         msg = source.read();
         Assert.assertTrue(msg == null);
         source.destroy();

Reply via email to