healchow commented on code in PR #8182:
URL: https://github.com/apache/inlong/pull/8182#discussion_r1222440024


##########
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java:
##########
@@ -425,67 +404,75 @@ private List<String> readFromPos(int pos) throws 
IOException {
         return lines;
     }
 
-    private int getStartBytePosition(int lineNum) throws IOException {
-        int startBytePosition = 0;
-        BufferedReader reader = null;
-        try {
-            LOGGER.info("get start line {}", lineNum);
-            String line = null;
-            List<String> lines = new ArrayList<>();
-            reader = new BufferedReader(new InputStreamReader(new 
FileInputStream(file),
-                    StandardCharsets.UTF_8));
-            int count = 0;
-            while ((line = reader.readLine()) != null) {
-                if (++count > lineNum) {
-                    LOGGER.info("get startBytePosition end at line {}", count);
+    /**
+     * Read new lines.
+     *
+     * @param reader The file to read
+     * @return The new position after the lines have been read
+     * @throws java.io.IOException if an I/O error occurs.
+     */
+    private static long readLines(RandomAccessFile reader, long pos, 
List<String> lines, int maxLineCount)
+            throws IOException {
+        if (maxLineCount == 0) {
+            return pos;
+        }
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        reader.seek(pos);
+        long rePos = pos; // position to re-read
+        int num;
+        LOGGER.debug("readLines from {}", pos);
+        boolean overLen = false;
+        while ((num = reader.read(inBuf)) != -1) {
+            int i = 0;
+            for (; i < num; i++) {
+                byte ch = inBuf[i];
+                switch (ch) {
+                    case '\n':
+                        lines.add(new String(baos.toByteArray()));
+                        rePos = pos + i + 1;
+                        if (overLen) {
+                            LOGGER.warn("readLines over len finally string len 
{}",
+                                    new String(baos.toByteArray()).length());
+                        }
+                        baos.reset();
+                        overLen = false;
+                        break;
+                    case '\r':
+                        break;
+                    default:
+                        if (baos.size() < maxPackSize) {
+                            baos.write(ch);
+                        } else {
+                            overLen = true;
+                        }
+                }
+                if (lines.size() >= maxLineCount) {
                     break;
                 }
-                startBytePosition = startBytePosition
-                        + line.getBytes(StandardCharsets.UTF_8).length + 
LINE_SEPARATOR_SIZE;
             }
-        } catch (Exception e) {
-            LOGGER.error("getStartPositon err {}", e);
-        } finally {
-            if (reader != null) {
-                reader.close();
+            if (lines.size() >= maxLineCount) {
+                break;
+            }
+            if (i == num) {
+                pos = reader.getFilePointer();
             }
         }
-        LOGGER.info("getStartPositon bytePosition {}", startBytePosition);
-        return startBytePosition;
-    }
-
-    private void isFirstStore() {
-        if (!jobConf.hasKey(JobConstants.JOB_STORE_TIME)) {
-            LOGGER.info("isFirstStore {},{}", file.getAbsolutePath(), 
this.firstStored);
-            this.firstStored = true;
-            return;
-        }
-        long jobStoreTime = 
Long.parseLong(jobConf.get(JobConstants.JOB_STORE_TIME));
-        LOGGER.info("jobStoreTime {},{}", file.getAbsolutePath(), 
jobStoreTime);
-        long storeTime = AgentConfiguration.getAgentConf().getLong(
-                AgentConstants.AGENT_JOB_STORE_TIME, 
AgentConstants.DEFAULT_JOB_STORE_TIME);
-
-        if (System.currentTimeMillis() - jobStoreTime > storeTime) {
-            this.firstStored = false;
-        } else {
-            this.firstStored = true;
-        }
-        LOGGER.info("isFirstStore {},{}", file.getAbsolutePath(), 
this.firstStored);
+        baos.close();
+        reader.seek(rePos); // Ensure we can re-read if necessary
+        return rePos;
     }
 
-    private void storeRocksDB() {
-        try (LineNumberReader lineNumberReader = new LineNumberReader(new 
FileReader(file.getPath()))) {
-            lineNumberReader.skip(Long.MAX_VALUE);
-            int seekPosition = lineNumberReader.getLineNumber();
-
-            String jobInstanceId = getJobInstanceId();
-            if (jobInstanceId != null) {
-                TaskPositionManager.getInstance().updateSinkPosition(
-                        jobInstanceId, getReadSource(), seekPosition, true);
-                LOGGER.info("storeRocksDB {},{}", file.getAbsolutePath(), 
seekPosition);
+    private boolean isFirstStore(JobProfile jobConf) {
+        boolean isFirst = true;
+        if (jobConf.hasKey(JobConstants.JOB_STORE_TIME)) {
+            long jobStoreTime = 
Long.parseLong(jobConf.get(JobConstants.JOB_STORE_TIME));
+            long storeTime = AgentConfiguration.getAgentConf().getLong(
+                    AgentConstants.AGENT_JOB_STORE_TIME, 
AgentConstants.DEFAULT_JOB_STORE_TIME);
+            if (System.currentTimeMillis() - jobStoreTime > storeTime) {
+                isFirst = false;
             }
-        } catch (IOException ex) {
-            LOGGER.error("get position error, file absolute path: {}", 
file.getAbsolutePath());
         }
+        LOGGER.info("isFirst {}, {}", file.getAbsolutePath(), isFirst);

Review Comment:
   Please add more details to the log info, the current way of writing is too 
little effective information.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to