This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new c3bdf56a92 [INLONG-9308][Agent] The sink end of the file instance 
supports sending data with different streamIds (#9309)
c3bdf56a92 is described below

commit c3bdf56a92d57a992e4639dbeb1f19e32c2d14d4
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Mon Nov 20 15:50:05 2023 +0800

    [INLONG-9308][Agent] The sink end of the file instance supports sending 
data with different streamIds (#9309)
---
 .../org/apache/inlong/agent/conf/TaskProfile.java  |  10 +-
 .../{PackageAckInfo.java => OffsetAckInfo.java}    |   7 +-
 .../agent/message/filecollect/ProxyMessage.java    | 100 +++++++++++++
 .../message/filecollect/ProxyMessageCache.java     |  72 +++++-----
 .../agent/message/filecollect/SenderMessage.java   |   2 +-
 .../apache/inlong/agent/utils/DateTransUtils.java  |  10 +-
 .../agent/plugin/sinks/filecollect/ProxySink.java  | 155 ++++++++++++++-------
 .../plugin/sinks/filecollect/SenderManager.java    |  95 +------------
 .../inlong/agent/plugin/sources/LogFileSource.java |   8 +-
 .../agent/plugin/task/filecollect/WatchEntity.java |   2 +-
 .../agent/plugin/utils/file/NewDateUtils.java      |   6 +-
 .../sinks/filecollect/TestSenderManager.java       |  42 +++---
 12 files changed, 295 insertions(+), 214 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
index be9b8cd1f3..cbad21e499 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
@@ -110,7 +110,8 @@ public class TaskProfile extends AbstractConfiguration {
         return hasKey(TaskConstants.TASK_ID) && 
hasKey(TaskConstants.TASK_SOURCE)
                 && hasKey(TaskConstants.TASK_SINK) && 
hasKey(TaskConstants.TASK_CHANNEL)
                 && hasKey(TaskConstants.TASK_GROUP_ID) && 
hasKey(TaskConstants.TASK_STREAM_ID)
-                && hasKey(TaskConstants.TASK_CYCLE_UNIT);
+                && hasKey(TaskConstants.TASK_CYCLE_UNIT)
+                && hasKey(TaskConstants.TASK_FILE_TIME_ZONE);
     }
 
     public String toJsonStr() {
@@ -125,10 +126,13 @@ public class TaskProfile extends AbstractConfiguration {
         instanceProfile.setSourceDataTime(dataTime);
         Long sinkDataTime = 0L;
         try {
-            sinkDataTime = DateTransUtils.timeStrConvertTomillSec(dataTime, 
getCycleUnit(),
+            sinkDataTime = DateTransUtils.timeStrConvertToMillSec(dataTime, 
getCycleUnit(),
                     TimeZone.getTimeZone(getTimeZone()));
         } catch (ParseException e) {
-            logger.error("createInstanceProfile error: ", e);
+            logger.error("createInstanceProfile ParseException error: ", e);
+            return null;
+        } catch (Exception e) {
+            logger.error("createInstanceProfile Exception error: ", e);
             return null;
         }
         instanceProfile.setSinkDataTime(sinkDataTime);
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/PackageAckInfo.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/OffsetAckInfo.java
similarity index 88%
rename from 
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/PackageAckInfo.java
rename to 
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/OffsetAckInfo.java
index 6efdbbdc1e..f6637955bc 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/PackageAckInfo.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/OffsetAckInfo.java
@@ -19,15 +19,12 @@ package org.apache.inlong.agent.message.filecollect;
 
 import lombok.AllArgsConstructor;
 import lombok.Data;
-import lombok.NoArgsConstructor;
 
 @Data
 @AllArgsConstructor
-@NoArgsConstructor
-public class PackageAckInfo {
+public class OffsetAckInfo {
 
-    private Long index;
     private Long offset;
-    private Integer len;
+    private int len;
     private Boolean hasAck;
 }
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessage.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessage.java
new file mode 100644
index 0000000000..7d9f4930ac
--- /dev/null
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessage.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.message.filecollect;
+
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.plugin.Message;
+
+import java.util.Map;
+
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
+import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_GROUP_ID;
+import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_STREAM_ID;
+
+/**
+ * Bus message with body, header, inlongGroupId and inlongStreamId.
+ */
+public class ProxyMessage implements Message {
+
+    private static final String DEFAULT_INLONG_STREAM_ID = "__";
+
+    private final byte[] body;
+    private final Map<String, String> header;
+    private final String inlongGroupId;
+    private final String inlongStreamId;
+    // determine the group key when making batch
+    private final String batchKey;
+    private final String dataKey;
+    OffsetAckInfo ackInfo;
+
+    public ProxyMessage(byte[] body, Map<String, String> header) {
+        this.body = body;
+        this.header = header;
+        this.inlongGroupId = header.get(PROXY_KEY_GROUP_ID);
+        this.inlongStreamId = header.getOrDefault(PROXY_KEY_STREAM_ID, 
DEFAULT_INLONG_STREAM_ID);
+        this.dataKey = header.getOrDefault(PROXY_KEY_DATA, "");
+        // use the batch key of user and inlongStreamId to determine one batch
+        this.batchKey = dataKey + inlongStreamId;
+        Long offset = Long.parseLong(header.get(TaskConstants.OFFSET));
+        ackInfo = new OffsetAckInfo(offset, body.length, false);
+    }
+
+    public ProxyMessage(Message message) {
+        this(message.getBody(), message.getHeader());
+    }
+
+    public String getDataKey() {
+        return dataKey;
+    }
+
+    /**
+     * Get first line of body list
+     *
+     * @return first line of body list
+     */
+    @Override
+    public byte[] getBody() {
+        return body;
+    }
+
+    public OffsetAckInfo getAckInfo() {
+        return ackInfo;
+    }
+
+    /**
+     * Get header of message
+     *
+     * @return header
+     */
+    @Override
+    public Map<String, String> getHeader() {
+        return header;
+    }
+
+    public String getInlongGroupId() {
+        return inlongGroupId;
+    }
+
+    public String getInlongStreamId() {
+        return inlongStreamId;
+    }
+
+    public String getBatchKey() {
+        return batchKey;
+    }
+}
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
index 7e2d3c8b8a..9c0c84e0ef 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
@@ -18,8 +18,6 @@
 package org.apache.inlong.agent.message.filecollect;
 
 import org.apache.inlong.agent.conf.InstanceProfile;
-import org.apache.inlong.agent.constant.TaskConstants;
-import org.apache.inlong.agent.message.ProxyMessage;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.agent.utils.DateTransUtils;
 import org.apache.inlong.common.msg.AttributeConstants;
@@ -32,6 +30,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -51,19 +50,17 @@ public class ProxyMessageCache {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ProxyMessageCache.class);
 
-    private final String groupId;
-    private final String streamId;
     private final String taskId;
     private final String instanceId;
     private final int maxPackSize;
     private final int maxQueueNumber;
-    private final String inodeInfo;
+    private final String groupId;
     // ms
     private final int cacheTimeout;
     // streamId -> list of proxyMessage
-    private final LinkedBlockingQueue<ProxyMessage> messageQueue;
+    private final ConcurrentHashMap<String, LinkedBlockingQueue<ProxyMessage>> 
messageQueueMap;
+    // private final LinkedBlockingQueue<ProxyMessage> messageQueue;
     private final AtomicLong cacheSize = new AtomicLong(0);
-    private Long packageIndex = 0L;
     private long lastPrintTime = 0;
     private long dataTime;
     /**
@@ -74,16 +71,15 @@ public class ProxyMessageCache {
     public ProxyMessageCache(InstanceProfile instanceProfile, String groupId, 
String streamId) {
         this.taskId = instanceProfile.getTaskId();
         this.instanceId = instanceProfile.getInstanceId();
+        this.groupId = groupId;
         this.maxPackSize = instanceProfile.getInt(PROXY_PACKAGE_MAX_SIZE, 
DEFAULT_PROXY_PACKAGE_MAX_SIZE);
         this.maxQueueNumber = 
instanceProfile.getInt(PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER,
                 DEFAULT_PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER);
         this.cacheTimeout = 
instanceProfile.getInt(PROXY_PACKAGE_MAX_TIMEOUT_MS, 
DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS);
-        this.messageQueue = new LinkedBlockingQueue<>(maxQueueNumber);
-        this.groupId = groupId;
-        this.streamId = streamId;
-        this.inodeInfo = instanceProfile.get(TaskConstants.INODE_INFO);
+        messageQueueMap = new ConcurrentHashMap<>();
+        // this.messageQueue = new LinkedBlockingQueue<>(maxQueueNumber);
         try {
-            dataTime = 
DateTransUtils.timeStrConvertTomillSec(instanceProfile.getSourceDataTime(),
+            dataTime = 
DateTransUtils.timeStrConvertToMillSec(instanceProfile.getSourceDataTime(),
                     instanceProfile.get(TASK_CYCLE_UNIT));
         } catch (ParseException e) {
             LOGGER.info("trans dataTime error", e);
@@ -101,22 +97,19 @@ public class ProxyMessageCache {
      *
      * @return true if is nearly full else false.
      */
-    private boolean queueIsFull() {
+    private boolean queueIsFull(LinkedBlockingQueue<ProxyMessage> 
messageQueue) {
         return messageQueue.size() >= maxQueueNumber - 1;
     }
 
     /**
      * Add proxy message to cache, proxy message should belong to the same 
stream id.
      */
-    public boolean addProxyMessage(ProxyMessage message) {
-        assert streamId.equals(message.getInlongStreamId());
+    public boolean add(ProxyMessage message) {
+        String streamId = message.getInlongStreamId();
+        LinkedBlockingQueue<ProxyMessage> messageQueue = 
makeSureQueueExist(streamId);
         try {
-            if (queueIsFull()) {
-                if (AgentUtils.getCurrentTime() - lastPrintTime > 
TimeUnit.SECONDS.toMillis(1)) {
-                    lastPrintTime = AgentUtils.getCurrentTime();
-                    LOGGER.warn("message queue is greater than {}, stop adding 
message, "
-                            + "maybe proxy get stuck", maxQueueNumber);
-                }
+            if (queueIsFull(messageQueue)) {
+                printQueueFull();
                 return false;
             }
             messageQueue.put(message);
@@ -128,11 +121,25 @@ public class ProxyMessageCache {
         return false;
     }
 
-    /**
-     * check message queue is empty or not
-     */
-    public boolean isEmpty() {
-        return messageQueue.isEmpty();
+    private void printQueueFull() {
+        if (AgentUtils.getCurrentTime() - lastPrintTime > 
TimeUnit.SECONDS.toMillis(1)) {
+            lastPrintTime = AgentUtils.getCurrentTime();
+            LOGGER.warn("message queue is greater than {}, stop adding 
message, "
+                    + "maybe proxy get stuck", maxQueueNumber);
+        }
+    }
+
+    public ConcurrentHashMap<String, LinkedBlockingQueue<ProxyMessage>> 
getMessageQueueMap() {
+        return messageQueueMap;
+    }
+
+    private LinkedBlockingQueue<ProxyMessage> makeSureQueueExist(String 
streamId) {
+        LinkedBlockingQueue<ProxyMessage> messageQueue = 
messageQueueMap.get(streamId);
+        if (messageQueue == null) {
+            messageQueue = new LinkedBlockingQueue<>();
+            messageQueueMap.put(streamId, messageQueue);
+        }
+        return messageQueue;
     }
 
     /**
@@ -140,10 +147,10 @@ public class ProxyMessageCache {
      *
      * @return map of message list, key is stream id for the batch; return 
null if there are no valid messages.
      */
-    public SenderMessage fetchSenderMessage() {
+    public SenderMessage fetchSenderMessage(String streamId, 
LinkedBlockingQueue<ProxyMessage> messageQueue) {
         int resultBatchSize = 0;
         List<byte[]> bodyList = new ArrayList<>();
-        Long packageOffset = TaskConstants.DEFAULT_OFFSET;
+        List<OffsetAckInfo> offsetList = new ArrayList<>();
         while (!messageQueue.isEmpty()) {
             // pre check message size
             ProxyMessage peekMessage = messageQueue.peek();
@@ -164,17 +171,12 @@ public class ProxyMessageCache {
             // decrease queue size.
             cacheSize.addAndGet(-bodySize);
             bodyList.add(message.getBody());
-            Long newOffset = 
Long.parseLong(message.getHeader().get(TaskConstants.OFFSET));
-            if (packageOffset < newOffset) {
-                packageOffset = newOffset;
-            }
+            offsetList.add(message.getAckInfo());
         }
         // make sure result is not empty.
         if (!bodyList.isEmpty()) {
-            PackageAckInfo ackInfo = new PackageAckInfo(packageIndex, 
packageOffset, resultBatchSize, false);
             SenderMessage senderMessage = new SenderMessage(taskId, 
instanceId, groupId, streamId, bodyList,
-                    dataTime, extraMap, ackInfo);
-            packageIndex++;
+                    dataTime, extraMap, offsetList);
             return senderMessage;
         }
         return null;
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/SenderMessage.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/SenderMessage.java
index a49005e8b3..da3579ca82 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/SenderMessage.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/SenderMessage.java
@@ -43,7 +43,7 @@ public class SenderMessage {
     private List<byte[]> dataList;
     private long dataTime;
     private Map<String, String> extraMap;
-    private PackageAckInfo ackInfo;
+    private List<OffsetAckInfo> offsetAckList;
 
     public InLongMsg getInLongMsg() {
         InLongMsg message = InLongMsg.newInLongMsg(true);
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java
index 6a79222a2a..2aa08742e8 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java
@@ -36,12 +36,12 @@ public class DateTransUtils {
     }
 
     // convert YYYMMDD to millSec by cycleUnit
-    public static long timeStrConvertTomillSec(String time, String cycleUnit)
+    public static long timeStrConvertToMillSec(String time, String cycleUnit)
             throws ParseException {
-        return timeStrConvertTomillSec(time, cycleUnit, TimeZone.getDefault());
+        return timeStrConvertToMillSec(time, cycleUnit, TimeZone.getDefault());
     }
 
-    public static long timeStrConvertTomillSec(String time, String cycleUnit, 
TimeZone timeZone)
+    public static long timeStrConvertToMillSec(String time, String cycleUnit, 
TimeZone timeZone)
             throws ParseException {
         long retTime = 0;
         SimpleDateFormat df = null;
@@ -62,9 +62,6 @@ public class DateTransUtils {
         try {
             df.setTimeZone(timeZone);
             retTime = df.parse(time).getTime();
-            if (cycleUnit.equals("10m")) {
-
-            }
         } catch (ParseException e) {
             logger.error("convert time string error. ", e);
         }
@@ -98,7 +95,6 @@ public class DateTransUtils {
         retTime = df.format(dateTime);
 
         if (cycleUnit.contains("m")) {
-
             int cycleNum = Integer.parseInt(cycleUnit.substring(0,
                     cycleUnit.length() - 1));
             int mmTime = Integer.parseInt(retTime.substring(
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
index 922400025e..ad4f07258a 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
@@ -19,10 +19,13 @@ package org.apache.inlong.agent.plugin.sinks.filecollect;
 
 import org.apache.inlong.agent.common.AgentThreadFactory;
 import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.conf.OffsetProfile;
 import org.apache.inlong.agent.constant.CommonConstants;
+import org.apache.inlong.agent.core.task.OffsetManager;
 import org.apache.inlong.agent.core.task.file.MemoryManager;
 import org.apache.inlong.agent.message.EndMessage;
-import org.apache.inlong.agent.message.ProxyMessage;
+import org.apache.inlong.agent.message.filecollect.OffsetAckInfo;
+import org.apache.inlong.agent.message.filecollect.ProxyMessage;
 import org.apache.inlong.agent.message.filecollect.SenderMessage;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.MessageFilter;
@@ -33,12 +36,19 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_FIELD_SPLITTER;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT;
+import static org.apache.inlong.agent.constant.TaskConstants.INODE_INFO;
 
 /**
  * sink message data to inlong-dataproxy
@@ -48,8 +58,7 @@ public class ProxySink extends AbstractSink {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ProxySink.class);
     private final int WRITE_FAILED_WAIT_TIME_MS = 10;
     private final int DESTROY_LOOP_WAIT_TIME_MS = 10;
-    private final Integer NO_WRITE_WAIT_AT_LEAST_MS = 5 * 1000;
-    private final Integer SINK_FINISH_AT_LEAST_COUNT = 5;
+    public final int SAVE_OFFSET_INTERVAL_MS = 1000;
     private static final ThreadPoolExecutor EXECUTOR_SERVICE = new 
ThreadPoolExecutor(
             0, Integer.MAX_VALUE,
             1L, TimeUnit.SECONDS,
@@ -61,8 +70,11 @@ public class ProxySink extends AbstractSink {
     private volatile boolean shutdown = false;
     private volatile boolean running = false;
     private volatile boolean inited = false;
-    private volatile long lastWriteTime = 0;
-    private volatile long checkSinkFinishCount = 0;
+    private long lastPrintTime = 0;
+    private List<OffsetAckInfo> ackInfoList = new ArrayList<>();
+    private final ReentrantReadWriteLock packageAckInfoLock = new 
ReentrantReadWriteLock(true);
+    private volatile boolean offsetRunning = false;
+    private OffsetManager offsetManager;
 
     public ProxySink() {
     }
@@ -71,7 +83,6 @@ public class ProxySink extends AbstractSink {
     public void write(Message message) {
         boolean suc = false;
         while (!shutdown && !suc) {
-            lastWriteTime = AgentUtils.getCurrentTime();
             suc = putInCache(message);
             if (!suc) {
                 AgentUtils.silenceSleepInMs(WRITE_FAILED_WAIT_TIME_MS);
@@ -84,8 +95,6 @@ public class ProxySink extends AbstractSink {
             if (message == null) {
                 return true;
             }
-            message.getHeader().put(CommonConstants.PROXY_KEY_GROUP_ID, 
inlongGroupId);
-            message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID, 
inlongStreamId);
             extractStreamFromMessage(message, fieldSplitter);
             if (message instanceof EndMessage) {
                 // increment the count of failed sinks
@@ -101,8 +110,10 @@ public class ProxySink extends AbstractSink {
             }
             cache.generateExtraMap(proxyMessage.getDataKey());
             // add message to package proxy
-            boolean suc = cache.addProxyMessage(proxyMessage);
-            if (!suc) {
+            boolean suc = cache.add(proxyMessage);
+            if (suc) {
+                addAckInfo(proxyMessage.getAckInfo());
+            } else {
                 
MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, 
message.getBody().length);
                 // increment the count of failed sinks
                 sinkMetric.sinkFailCount.incrementAndGet();
@@ -123,8 +134,6 @@ public class ProxySink extends AbstractSink {
         if (messageFilter != null) {
             message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID,
                     messageFilter.filterStreamId(message, fieldSplitter));
-        } else {
-            message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID, 
inlongStreamId);
         }
     }
 
@@ -139,50 +148,46 @@ public class ProxySink extends AbstractSink {
                     "flushCache-" + profile.getTaskId() + "-" + 
profile.getInstanceId());
             LOGGER.info("start flush cache {}:{}", inlongGroupId, sourceName);
             running = true;
-            long lastPrintTime = AgentUtils.getCurrentTime();
             while (!shutdown) {
-                try {
-                    SenderMessage senderMessage = cache.fetchSenderMessage();
-                    if (senderMessage != null) {
-                        checkSinkFinishCount = 0;
-                        senderManager.sendBatch(senderMessage);
-                        if (AgentUtils.getCurrentTime() - lastPrintTime > 
TimeUnit.SECONDS.toMillis(1)) {
-                            lastPrintTime = AgentUtils.getCurrentTime();
-                            LOGGER.info("send groupId {}, streamId {}, message 
size {}, taskId {}, "
-                                    + "instanceId {} sendTime is {}", 
inlongGroupId, inlongStreamId,
-                                    senderMessage.getDataList().size(), 
profile.getTaskId(),
-                                    profile.getInstanceId(),
-                                    senderMessage.getDataTime());
-                        }
-                    }
-                    if (noWriteLongEnough() && senderManager.sendFinished()) {
-                        checkSinkFinishCount++;
-                    } else {
-                        checkSinkFinishCount = 0;
-                    }
-                } catch (Exception ex) {
-                    LOGGER.error("error caught", ex);
-                } catch (Throwable t) {
-                    ThreadUtils.threadThrowableHandler(Thread.currentThread(), 
t);
-                } finally {
-                    AgentUtils.silenceSleepInMs(batchFlushInterval);
-                }
+                sendMessageFromCache();
+                AgentUtils.silenceSleepInMs(batchFlushInterval);
             }
             LOGGER.info("stop flush cache {}:{}", inlongGroupId, sourceName);
             running = false;
         };
     }
 
+    public void sendMessageFromCache() {
+        ConcurrentHashMap<String, LinkedBlockingQueue<ProxyMessage>> 
messageQueueMap = cache.getMessageQueueMap();
+        for (Map.Entry<String, LinkedBlockingQueue<ProxyMessage>> entry : 
messageQueueMap.entrySet()) {
+            SenderMessage senderMessage = 
cache.fetchSenderMessage(entry.getKey(), entry.getValue());
+            if (senderMessage == null) {
+                continue;
+            }
+            senderManager.sendBatch(senderMessage);
+            if (AgentUtils.getCurrentTime() - lastPrintTime > 
TimeUnit.SECONDS.toMillis(1)) {
+                lastPrintTime = AgentUtils.getCurrentTime();
+                LOGGER.info("send groupId {}, streamId {}, message size {}, 
taskId {}, "
+                        + "instanceId {} sendTime is {}", inlongGroupId, 
inlongStreamId,
+                        senderMessage.getDataList().size(), 
profile.getTaskId(),
+                        profile.getInstanceId(),
+                        senderMessage.getDataTime());
+            }
+        }
+    }
+
     @Override
     public void init(InstanceProfile profile) {
         super.init(profile);
         fieldSplitter = profile.get(CommonConstants.FIELD_SPLITTER, 
DEFAULT_FIELD_SPLITTER).getBytes(
                 StandardCharsets.UTF_8);
         sourceName = profile.getInstanceId();
+        offsetManager = OffsetManager.init();
         senderManager = new SenderManager(profile, inlongGroupId, sourceName);
         try {
             senderManager.Start();
             EXECUTOR_SERVICE.execute(coreThread());
+            EXECUTOR_SERVICE.execute(flushOffset());
             inited = true;
         } catch (Throwable ex) {
             shutdown = true;
@@ -199,11 +204,11 @@ public class ProxySink extends AbstractSink {
             return;
         }
         shutdown = true;
-        while (running) {
+        while (running || offsetRunning) {
             AgentUtils.silenceSleepInMs(DESTROY_LOOP_WAIT_TIME_MS);
         }
-        MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, (int) 
cache.getCacheSize());
         senderManager.Stop();
+        clearOffset();
         LOGGER.info("destroy sink {} end", sourceName);
     }
 
@@ -212,18 +217,70 @@ public class ProxySink extends AbstractSink {
      */
     @Override
     public boolean sinkFinish() {
-        if (noWriteLongEnough() && sinkFinishLongEnough()) {
-            return true;
-        } else {
-            return false;
+        boolean finished = false;
+        packageAckInfoLock.writeLock().lock();
+        if (ackInfoList.isEmpty()) {
+            finished = true;
         }
+        packageAckInfoLock.writeLock().unlock();
+        return finished;
     }
 
-    public boolean noWriteLongEnough() {
-        return AgentUtils.getCurrentTime() - lastWriteTime > 
NO_WRITE_WAIT_AT_LEAST_MS;
+    private void addAckInfo(OffsetAckInfo info) {
+        packageAckInfoLock.writeLock().lock();
+        ackInfoList.add(info);
+        packageAckInfoLock.writeLock().unlock();
     }
 
-    public boolean sinkFinishLongEnough() {
-        return checkSinkFinishCount > SINK_FINISH_AT_LEAST_COUNT;
+    /**
+     * flushOffset
+     *
+     * @return thread runner
+     */
+    private Runnable flushOffset() {
+        return () -> {
+            AgentThreadFactory.nameThread(
+                    "flushOffset-" + profile.getTaskId() + "-" + 
profile.getInstanceId());
+            LOGGER.info("start flush offset {}:{}", inlongGroupId, sourceName);
+            offsetRunning = true;
+            while (!shutdown) {
+                doFlushOffset();
+                AgentUtils.silenceSleepInMs(SAVE_OFFSET_INTERVAL_MS);
+            }
+            LOGGER.info("stop flush offset {}:{}", inlongGroupId, sourceName);
+            offsetRunning = false;
+        };
+    }
+
+    /**
+     * flushOffset
+     */
+    private void doFlushOffset() {
+        packageAckInfoLock.writeLock().lock();
+        OffsetAckInfo info = null;
+        for (int i = 0; i < ackInfoList.size();) {
+            if (ackInfoList.get(i).getHasAck()) {
+                info = ackInfoList.remove(i);
+                
MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, info.getLen());
+            } else {
+                break;
+            }
+        }
+        if (info != null) {
+            LOGGER.info("save offset {} taskId {} instanceId {}", 
info.getOffset(), profile.getTaskId(),
+                    profile.getInstanceId());
+            OffsetProfile offsetProfile = new 
OffsetProfile(profile.getTaskId(), profile.getInstanceId(),
+                    info.getOffset(), profile.get(INODE_INFO));
+            offsetManager.setOffset(offsetProfile);
+        }
+        packageAckInfoLock.writeLock().unlock();
+    }
+
+    private void clearOffset() {
+        packageAckInfoLock.writeLock().lock();
+        for (int i = 0; i < ackInfoList.size();) {
+            MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, 
ackInfoList.remove(i).getLen());
+        }
+        packageAckInfoLock.writeLock().unlock();
     }
 }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
index bea53a1e9d..45fe9bc63e 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
@@ -20,11 +20,7 @@ package org.apache.inlong.agent.plugin.sinks.filecollect;
 import org.apache.inlong.agent.common.AgentThreadFactory;
 import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.conf.InstanceProfile;
-import org.apache.inlong.agent.conf.OffsetProfile;
 import org.apache.inlong.agent.constant.CommonConstants;
-import org.apache.inlong.agent.core.task.OffsetManager;
-import org.apache.inlong.agent.core.task.file.MemoryManager;
-import org.apache.inlong.agent.message.filecollect.PackageAckInfo;
 import org.apache.inlong.agent.message.filecollect.SenderMessage;
 import org.apache.inlong.agent.metrics.AgentMetricItem;
 import org.apache.inlong.agent.metrics.AgentMetricItemSet;
@@ -44,7 +40,6 @@ import io.netty.util.concurrent.DefaultThreadFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -53,19 +48,15 @@ import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_BATCH_FLUSH_INTERVAL;
 import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_BATCH_FLUSH_INTERVAL;
-import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_ID;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_KEY;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT;
 import static 
org.apache.inlong.agent.constant.TaskConstants.DEFAULT_JOB_PROXY_SEND;
-import static org.apache.inlong.agent.constant.TaskConstants.INODE_INFO;
 import static org.apache.inlong.agent.constant.TaskConstants.JOB_PROXY_SEND;
 import static 
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID;
 import static 
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID;
@@ -78,8 +69,6 @@ public class SenderManager {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(SenderManager.class);
     private static final SequentialID SEQUENTIAL_ID = 
SequentialID.getInstance();
-    public final int SAVE_OFFSET_INTERVAL_MS = 1000;
-    private final AtomicInteger SENDER_INDEX = new AtomicInteger(0);
     // cache for group and sender list, share the map cross agent lifecycle.
     private DefaultMessageSender sender;
     private LinkedBlockingQueue<AgentSenderCallback> resendQueue;
@@ -113,16 +102,12 @@ public class SenderManager {
     // metric
     private AgentMetricItemSet metricItemSet;
     private Map<String, String> dimensions;
-    private OffsetManager offsetManager;
     private int ioThreadNum;
     private boolean enableBusyWait;
     private String authSecretId;
     private String authSecretKey;
     protected int batchFlushInterval;
-    private List<PackageAckInfo> packageAckInfoList = new ArrayList<>();
-    private final ReentrantReadWriteLock packageAckInfoLock = new 
ReentrantReadWriteLock(true);
     protected InstanceProfile profile;
-    private volatile boolean offsetRunning = false;
     private volatile boolean resendRunning = false;
     private volatile boolean started = false;
 
@@ -155,7 +140,6 @@ public class SenderManager {
         retrySleepTime = profile.getLong(
                 CommonConstants.PROXY_RETRY_SLEEP, 
CommonConstants.DEFAULT_PROXY_RETRY_SLEEP);
         isFile = profile.getBoolean(CommonConstants.PROXY_IS_FILE, 
CommonConstants.DEFAULT_IS_FILE);
-        offsetManager = OffsetManager.init();
         ioThreadNum = 
profile.getInt(CommonConstants.PROXY_CLIENT_IO_THREAD_NUM,
                 CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM);
         enableBusyWait = 
profile.getBoolean(CommonConstants.PROXY_CLIENT_ENABLE_BUSY_WAIT,
@@ -178,7 +162,6 @@ public class SenderManager {
 
     public void Start() throws Exception {
         createMessageSender(inlongGroupId);
-        EXECUTOR_SERVICE.execute(flushOffset());
         EXECUTOR_SERVICE.execute(flushResendQueue());
         started = true;
     }
@@ -189,11 +172,10 @@ public class SenderManager {
         if (!started) {
             return;
         }
-        while (offsetRunning || resendRunning) {
+        while (resendRunning) {
             AgentUtils.silenceSleepInMs(1);
         }
         closeMessageSender();
-        clearOffset();
         LOGGER.info("stop send manager end");
     }
 
@@ -247,60 +229,11 @@ public class SenderManager {
         while (!shutdown && !resendQueue.isEmpty()) {
             AgentUtils.silenceSleepInMs(retrySleepTime);
         }
-        addAckInfo(message.getAckInfo());
         if (!shutdown) {
             sendBatchWithRetryCount(message, 0);
         }
     }
 
-    private void addAckInfo(PackageAckInfo info) {
-        packageAckInfoLock.writeLock().lock();
-        packageAckInfoList.add(info);
-        packageAckInfoLock.writeLock().unlock();
-    }
-
-    public boolean sendFinished() {
-        boolean finished = false;
-        packageAckInfoLock.writeLock().lock();
-        if (packageAckInfoList.isEmpty()) {
-            finished = true;
-        }
-        packageAckInfoLock.writeLock().unlock();
-        return finished;
-    }
-
-    /**
-     * flushOffset
-     */
-    private void doFlushOffset() {
-        packageAckInfoLock.writeLock().lock();
-        PackageAckInfo info = null;
-        for (int i = 0; i < packageAckInfoList.size();) {
-            if (packageAckInfoList.get(i).getHasAck()) {
-                info = packageAckInfoList.remove(i);
-                
MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, info.getLen());
-            } else {
-                break;
-            }
-        }
-        if (info != null) {
-            LOGGER.info("save offset {} taskId {} instanceId {}", 
info.getOffset(), profile.getTaskId(),
-                    profile.getInstanceId());
-            OffsetProfile offsetProfile = new 
OffsetProfile(profile.getTaskId(), profile.getInstanceId(),
-                    info.getOffset(), profile.get(INODE_INFO));
-            offsetManager.setOffset(offsetProfile);
-        }
-        packageAckInfoLock.writeLock().unlock();
-    }
-
-    private void clearOffset() {
-        packageAckInfoLock.writeLock().lock();
-        for (int i = 0; i < packageAckInfoList.size();) {
-            MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, 
packageAckInfoList.remove(i).getLen());
-        }
-        packageAckInfoLock.writeLock().unlock();
-    }
-
     /**
      * Send message to proxy by batch, use message cache.
      */
@@ -369,26 +302,6 @@ public class SenderManager {
         };
     }
 
-    /**
-     * flushOffset
-     *
-     * @return thread runner
-     */
-    private Runnable flushOffset() {
-        return () -> {
-            AgentThreadFactory.nameThread(
-                    "flushOffset-" + profile.getTaskId() + "-" + 
profile.getInstanceId());
-            LOGGER.info("start flush offset {}:{}", inlongGroupId, sourcePath);
-            offsetRunning = true;
-            while (!shutdown) {
-                doFlushOffset();
-                AgentUtils.silenceSleepInMs(SAVE_OFFSET_INTERVAL_MS);
-            }
-            LOGGER.info("stop flush offset {}:{}", inlongGroupId, sourcePath);
-            offsetRunning = false;
-        };
-    }
-
     /**
      * put the data into resend queue and will be resent later.
      *
@@ -402,6 +315,10 @@ public class SenderManager {
         }
     }
 
+    public boolean sendFinished() {
+        return true;
+    }
+
     /**
      * sender callback
      */
@@ -425,7 +342,7 @@ public class SenderManager {
             String instanceId = message.getInstanceId();
             long dataTime = message.getDataTime();
             if (result != null && result.equals(SendResult.OK)) {
-                message.getAckInfo().setHasAck(true);
+                message.getOffsetAckList().forEach(ack -> ack.setHasAck(true));
                 getMetricItem(groupId, 
streamId).pluginSendSuccessCount.addAndGet(msgCnt);
                 AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, 
groupId, streamId,
                         dataTime, message.getMsgCnt(), message.getTotalSize());
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index 1d19ffc7de..e2b0b06517 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -68,6 +68,7 @@ import java.util.concurrent.TimeUnit;
 import static org.apache.inlong.agent.constant.CommonConstants.COMMA;
 import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE;
 import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
+import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_STREAM_ID;
 import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE;
 import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_SEND_PARTITION_KEY;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT;
@@ -156,7 +157,7 @@ public class LogFileSource extends AbstractSource {
             linePosition = getInitLineOffset(isIncrement, taskId, instanceId, 
inodeInfo);
             bytePosition = getBytePositionByLine(linePosition);
             queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
-            dataTime = 
DateTransUtils.timeStrConvertTomillSec(profile.getSourceDataTime(),
+            dataTime = 
DateTransUtils.timeStrConvertToMillSec(profile.getSourceDataTime(),
                     profile.get(TASK_CYCLE_UNIT));
             try {
                 registerMeta(profile);
@@ -348,12 +349,13 @@ public class LogFileSource extends AbstractSource {
 
     private Message createMessage(SourceData sourceData) {
         String msgWithMetaData = fillMetaData(sourceData.data);
-        AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, 
inlongStreamId,
-                dataTime, 1, msgWithMetaData.length());
         String proxyPartitionKey = profile.get(PROXY_SEND_PARTITION_KEY, 
DigestUtils.md5Hex(inlongGroupId));
         Map<String, String> header = new HashMap<>();
         header.put(PROXY_KEY_DATA, proxyPartitionKey);
         header.put(OFFSET, sourceData.offset.toString());
+        header.put(PROXY_KEY_STREAM_ID, inlongStreamId);
+        AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, 
header.get(PROXY_KEY_STREAM_ID),
+                dataTime, 1, msgWithMetaData.length());
         Message finalMsg = new 
DefaultMessage(msgWithMetaData.getBytes(StandardCharsets.UTF_8), header);
         // if the message size is greater than max pack size,should drop it.
         if (finalMsg.getBody().length > maxPackSize) {
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/WatchEntity.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/WatchEntity.java
index 8fb9755716..a846f699c7 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/WatchEntity.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/WatchEntity.java
@@ -274,7 +274,7 @@ public class WatchEntity {
         logger.info("removeUselessWatchDirectories {}", curDataTime);
 
         /* Calculate the data time which is 3 cycle units earlier than current 
task data time. */
-        long curDataTimeMillis = 
DateTransUtils.timeStrConvertTomillSec(curDataTime, cycleUnit);
+        long curDataTimeMillis = 
DateTransUtils.timeStrConvertToMillSec(curDataTime, cycleUnit);
         Calendar calendar = Calendar.getInstance();
         calendar.setTimeInMillis(curDataTimeMillis);
         if ("D".equalsIgnoreCase(cycleUnit)) {
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
index b06478558d..62207acc81 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
@@ -124,7 +124,7 @@ public class NewDateUtils {
         String retTime = DateTransUtils.millSecConvertToTimeStr(
                 System.currentTimeMillis(), cycleUnit);
         try {
-            long time = DateTransUtils.timeStrConvertTomillSec(dataTime, 
cycleUnit);
+            long time = DateTransUtils.timeStrConvertToMillSec(dataTime, 
cycleUnit);
 
             Calendar calendar = Calendar.getInstance();
             calendar.setTimeInMillis(time);
@@ -592,8 +592,8 @@ public class NewDateUtils {
         long startTime;
         long endTime;
         try {
-            startTime = DateTransUtils.timeStrConvertTomillSec(start, 
cycleUnit);
-            endTime = DateTransUtils.timeStrConvertTomillSec(end, cycleUnit);
+            startTime = DateTransUtils.timeStrConvertToMillSec(start, 
cycleUnit);
+            endTime = DateTransUtils.timeStrConvertToMillSec(end, cycleUnit);
         } catch (ParseException e) {
             logger.error("date format is error: ", e);
             return ret;
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
index d7c03bf099..0fae339400 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
@@ -21,7 +21,7 @@ import org.apache.inlong.agent.common.AgentThreadFactory;
 import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.conf.TaskProfile;
 import org.apache.inlong.agent.constant.TaskConstants;
-import org.apache.inlong.agent.message.filecollect.PackageAckInfo;
+import org.apache.inlong.agent.message.filecollect.OffsetAckInfo;
 import org.apache.inlong.agent.message.filecollect.SenderMessage;
 import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
 import org.apache.inlong.agent.plugin.utils.file.FileDataUtils;
@@ -50,8 +50,6 @@ import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-import static org.awaitility.Awaitility.await;
-
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(SenderManager.class)
 @PowerMockIgnore({"javax.management.*"})
@@ -98,37 +96,45 @@ public class TestSenderManager {
                     Mockito.any(), Mockito.any(), Mockito.any(), 
Mockito.anyLong(), Mockito.any(),
                     Mockito.anyLong(), Mockito.any(),
                     Mockito.any(), Mockito.anyBoolean());
-
             senderManager.Start();
-            Long packageIndex = 0L;
-            Long packageOffset = 100L;
-            List<byte[]> bodyList = new ArrayList<>();
-            bodyList.add("123456789".getBytes(StandardCharsets.UTF_8));
-            Integer resultBatchSize = 0;
-            for (int i = 0; i < bodyList.size(); i++) {
-                resultBatchSize += bodyList.get(i).length;
-            }
+            Long offset = 0L;
+            List<OffsetAckInfo> ackInfoListTotal = new ArrayList<>();
             for (int i = 0; i < 10; i++) {
-                PackageAckInfo ackInfo = new PackageAckInfo(packageIndex++, 
packageOffset, resultBatchSize, false);
+                List<byte[]> bodyList = new ArrayList<>();
+                List<OffsetAckInfo> ackInfoList = new ArrayList<>();
+                bodyList.add("123456789".getBytes(StandardCharsets.UTF_8));
+                for (int j = 0; j < bodyList.size(); j++) {
+                    OffsetAckInfo ackInfo = new OffsetAckInfo(offset++, 
bodyList.get(j).length, false);
+                    ackInfoList.add(ackInfo);
+                    ackInfoListTotal.add(ackInfo);
+                }
                 SenderMessage senderMessage = new SenderMessage("taskId", 
"instanceId", "groupId", "streamId", bodyList,
-                        AgentUtils.getCurrentTime(), null, ackInfo);
+                        AgentUtils.getCurrentTime(), null, ackInfoList);
                 senderManager.sendBatch(senderMessage);
-                packageOffset += 100;
             }
             Assert.assertTrue(cbList.size() == 10);
             for (int i = 0; i < 5; i++) {
                 cbList.get(4 - i).onMessageAck(SendResult.OK);
             }
-
-            await().atMost(2, TimeUnit.SECONDS).until(() -> 
!senderManager.sendFinished());
+            Assert.assertTrue(calHasAckCount(ackInfoListTotal) == 5);
             for (int i = 5; i < 10; i++) {
                 cbList.get(i).onMessageAck(SendResult.OK);
                 AgentUtils.silenceSleepInMs(10);
             }
-            await().atMost(2, TimeUnit.SECONDS).until(() -> 
senderManager.sendFinished());
+            
Assert.assertTrue(String.valueOf(calHasAckCount(ackInfoListTotal)), 
calHasAckCount(ackInfoListTotal) == 10);
         } catch (Exception e) {
             e.printStackTrace();
             Assert.assertTrue("testNormalAck failed", false);
         }
     }
+
+    private int calHasAckCount(List<OffsetAckInfo> ackInfoListTotal) {
+        int count = 0;
+        for (int i = 0; i < ackInfoListTotal.size(); i++) {
+            if (ackInfoListTotal.get(i).getHasAck()) {
+                count++;
+            }
+        }
+        return count;
+    }
 }
\ No newline at end of file

Reply via email to