This is an automated email from the ASF dual-hosted git repository.

wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ccbd97a6e [INLONG-11571][Agent] Add classes for actual collection of 
COS source (#11572)
7ccbd97a6e is described below

commit 7ccbd97a6e20312d9ff0debdde18dd482b065442
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Wed Dec 4 16:56:46 2024 +0800

    [INLONG-11571][Agent] Add classes for actual collection of COS source 
(#11572)
    
    * [INLONG-11571][Agent] Add classes for actual collection of COS source
    
    * [INLONG-11571][Agent] Delete invalid comments
    
    * Update 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/COSTask.java
    
    Co-authored-by: AloysZhang <lofterzh...@gmail.com>
    
    * [INLONG-11571][Agent] Modify code based on comments
    
    ---------
    
    Co-authored-by: AloysZhang <lofterzh...@gmail.com>
---
 inlong-agent/agent-plugins/pom.xml                 |   5 +
 .../agent/plugin/fetcher/ManagerFetcher.java       |   3 +-
 .../inlong/agent/plugin/instance/COSInstance.java  |  35 ++
 .../agent/plugin/instance/CommonInstance.java      |   2 +-
 .../inlong/agent/plugin/sinks/ProxySink.java       |  12 +-
 .../inlong/agent/plugin/sources/COSSource.java     | 356 +++++++++++++++++++++
 .../inlong/agent/plugin/sources/LogFileSource.java |   2 +-
 .../agent/plugin/sources/file/AbstractSource.java  |  14 +-
 .../inlong/agent/plugin/task/cos/COSTask.java      | 338 +++++++++++++++++++
 .../inlong/agent/plugin/task/cos/FileScanner.java  | 165 ++++++++++
 .../agent/plugin/utils/cos/COSConfigHandler.java   |  26 ++
 .../inlong/agent/plugin/utils/cos/COSUtils.java    |  33 ++
 .../plugin/utils/cos/DefaultCOSConfigHandler.java  |  28 ++
 licenses/inlong-agent/LICENSE                      |   1 +
 .../inlong-agent/licenses/LICENSE-cos-java-sdk.txt |  21 ++
 pom.xml                                            |   1 +
 16 files changed, 1025 insertions(+), 17 deletions(-)

diff --git a/inlong-agent/agent-plugins/pom.xml 
b/inlong-agent/agent-plugins/pom.xml
index 4c77f84dc3..863e285ab2 100644
--- a/inlong-agent/agent-plugins/pom.xml
+++ b/inlong-agent/agent-plugins/pom.xml
@@ -92,6 +92,11 @@
             <artifactId>kafka-clients</artifactId>
             <version>${kafka.clients.version}</version>
         </dependency>
+        <dependency>
+            <groupId>com.qcloud</groupId>
+            <artifactId>cos_api</artifactId>
+            <version>${cos.sdk.version}</version>
+        </dependency>
         <dependency>
             <groupId>io.debezium</groupId>
             <artifactId>debezium-connector-mysql</artifactId>
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
index 01d7128f8b..3ab63b0118 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
@@ -28,6 +28,7 @@ import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.agent.utils.HttpManager;
 import org.apache.inlong.agent.utils.ThreadUtils;
 import org.apache.inlong.common.enums.PullJobTypeEnum;
+import org.apache.inlong.common.enums.TaskTypeEnum;
 import org.apache.inlong.common.pojo.agent.AgentConfigInfo;
 import org.apache.inlong.common.pojo.agent.AgentConfigRequest;
 import org.apache.inlong.common.pojo.agent.AgentResponseCode;
@@ -237,7 +238,7 @@ public class ManagerFetcher extends AbstractDaemon 
implements ProfileFetcher {
         dataConfig.setInlongGroupId("devcloud_group_id");
         dataConfig.setInlongStreamId("devcloud_stream_id");
         dataConfig.setDataReportType(0);
-        dataConfig.setTaskType(3);
+        dataConfig.setTaskType(TaskTypeEnum.FILE.getType());
         dataConfig.setTaskId(taskId);
         dataConfig.setState(state);
         dataConfig.setTimeZone("GMT+8:00");
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/COSInstance.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/COSInstance.java
new file mode 100644
index 0000000000..6f40f727dd
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/COSInstance.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.instance;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.constant.TaskConstants;
+
+import java.io.IOException;
+
+/**
+ * cos instance contains source and sink.
+ * main job is to read from source and write to sink
+ */
+public class COSInstance extends CommonInstance {
+
+    @Override
+    public void setInodeInfo(InstanceProfile profile) throws IOException {
+        profile.set(TaskConstants.INODE_INFO, "");
+    }
+}
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java
index 7267066aee..6cc97a159c 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java
@@ -122,7 +122,7 @@ public abstract class CommonInstance extends Instance {
 
     @Override
     public void run() {
-        Thread.currentThread().setName("file-instance-core-" + getTaskId() + 
"-" + getInstanceId());
+        Thread.currentThread().setName("instance-core-" + getTaskId() + "-" + 
getInstanceId());
         running = true;
         try {
             doRun();
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
index 2f54ec59de..e00ad65cba 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
@@ -196,18 +196,18 @@ public class ProxySink extends AbstractSink {
         }
         Long start = AgentUtils.getCurrentTime();
         shutdown = true;
+        senderManager.Stop();
+        LOGGER.info("destroy proxySink, wait for sender close {} ms instance 
{}", AgentUtils.getCurrentTime() - start,
+                profile.getInstanceId());
+        start = AgentUtils.getCurrentTime();
         while (running || offsetRunning) {
             AgentUtils.silenceSleepInMs(LOOP_WAIT_TIME_MS);
         }
-        LOGGER.info("destroy proxySink wait run elapse {} ms instance {}", 
AgentUtils.getCurrentTime() - start,
-                profile.getInstanceId());
-        start = AgentUtils.getCurrentTime();
-        senderManager.Stop();
-        LOGGER.info("destroy proxySink wait sender elapse {} ms instance {}", 
AgentUtils.getCurrentTime() - start,
+        LOGGER.info("destroy proxySink, wait for run close {} ms instance {}", 
AgentUtils.getCurrentTime() - start,
                 profile.getInstanceId());
         start = AgentUtils.getCurrentTime();
         clearOffset();
-        LOGGER.info("destroy proxySink wait offset elapse {} ms instance {}", 
AgentUtils.getCurrentTime() - start,
+        LOGGER.info("destroy proxySink, wait for offset clear {} ms instance 
{}", AgentUtils.getCurrentTime() - start,
                 profile.getInstanceId());
         LOGGER.info("destroy sink {} end", sourceName);
     }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/COSSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/COSSource.java
new file mode 100755
index 0000000000..b6792ac82d
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/COSSource.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources;
+
+import org.apache.inlong.agent.common.AgentThreadFactory;
+import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.conf.OffsetProfile;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.core.FileStaticManager;
+import org.apache.inlong.agent.core.FileStaticManager.FileStatic;
+import org.apache.inlong.agent.core.task.MemoryManager;
+import org.apache.inlong.agent.core.task.OffsetManager;
+import org.apache.inlong.agent.except.FileException;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
+import org.apache.inlong.agent.plugin.sources.extend.DefaultExtendedHandler;
+import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
+import org.apache.inlong.agent.plugin.utils.cos.COSUtils;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.ThreadUtils;
+
+import com.qcloud.cos.COSClient;
+import com.qcloud.cos.model.COSObject;
+import com.qcloud.cos.model.GetObjectRequest;
+import com.qcloud.cos.model.ObjectMetadata;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.inlong.agent.constant.TaskConstants.COS_CONTENT_STYLE;
+
+/**
+ * Read COS files
+ */
+public class COSSource extends AbstractSource {
+
+    public static final int LEN_OF_FILE_OFFSET_ARRAY = 2;
+    public static final String AGENT_GLOBAL_COS_SOURCE_PERMIT = 
"agent.global.cos.source.permit";
+    public static final int DEFAULT_AGENT_GLOBAL_COS_SOURCE_PERMIT = 128 * 
1000 * 1000;
+
+    @Data
+    @AllArgsConstructor
+    @NoArgsConstructor
+    protected class FileOffset {
+
+        private Long lineOffset;
+        private Long byteOffset;
+        private boolean hasByteOffset;
+    }
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(COSSource.class);
+    public static final String OFFSET_SEP = ":";
+    protected final Integer WAIT_TIMEOUT_MS = 10;
+    private final Integer SIZE_OF_BUFFER_TO_READ_FILE = 1024 * 1024;
+    private final Long META_UPDATE_INTERVAL_MS = 10000L;
+    private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss");
+
+    private String fileName;
+    private byte[] bufferToReadFile;
+    public volatile long linePosition = 0;
+    public volatile long bytePosition = 0;
+    private volatile boolean fileExist = true;
+    private volatile long lastInodeUpdateTime = 0;
+    private COSClient cosClient;
+    private String bucketName;
+    private String secretId;
+    private String secretKey;
+    private String strRegion;
+    private ObjectMetadata metadata;
+    protected BlockingQueue<SourceData> queue;
+    private static final ThreadPoolExecutor EXECUTOR_SERVICE = new 
ThreadPoolExecutor(
+            0, Integer.MAX_VALUE,
+            1L, TimeUnit.SECONDS,
+            new SynchronousQueue<>(),
+            new AgentThreadFactory("cos-source-pool"));
+    private volatile boolean running = false;
+
+    public COSSource() {
+    }
+
+    @Override
+    protected void initExtendClass() {
+        extendClass = DefaultExtendedHandler.class.getCanonicalName();
+    }
+
+    @Override
+    protected void initSource(InstanceProfile profile) {
+        try {
+            String offset = "";
+            if (offsetProfile != null) {
+                offset = offsetProfile.toJsonStr();
+            }
+            LOGGER.info("LogFileSource init: {} offset: {}", 
profile.toJsonStr(), offset);
+            AgentConfiguration conf = AgentConfiguration.getAgentConf();
+            int permit = conf.getInt(AGENT_GLOBAL_COS_SOURCE_PERMIT, 
DEFAULT_AGENT_GLOBAL_COS_SOURCE_PERMIT);
+            
MemoryManager.getInstance().addSemaphore(AGENT_GLOBAL_COS_SOURCE_PERMIT, 
permit);
+            fileName = profile.getInstanceId();
+            bucketName = profile.get(TaskConstants.COS_TASK_BUCKET_NAME);
+            secretId = profile.get(TaskConstants.COS_TASK_SECRET_ID);
+            secretKey = profile.get(TaskConstants.COS_TASK_SECRET_KEY);
+            strRegion = profile.get(TaskConstants.COS_TASK_REGION);
+            cosClient = COSUtils.createCli(secretId, secretKey, strRegion);
+            metadata = cosClient.getObjectMetadata(bucketName, fileName);
+            queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
+            bufferToReadFile = new byte[SIZE_OF_BUFFER_TO_READ_FILE];
+            lastInodeUpdateTime = AgentUtils.getCurrentTime();
+            initOffset(taskId);
+            EXECUTOR_SERVICE.execute(run());
+        } catch (Exception ex) {
+            stopRunning();
+            throw new FileException("error init stream for " + fileName, ex);
+        }
+    }
+
+    @Override
+    protected boolean doPrepareToRead() {
+        if (AgentUtils.getCurrentTime() - lastInodeUpdateTime > 
META_UPDATE_INTERVAL_MS) {
+            metadata = cosClient.getObjectMetadata(bucketName, fileName);
+            lastInodeUpdateTime = AgentUtils.getCurrentTime();
+        }
+        if (metadata.getContentLength() < bytePosition) {
+            fileExist = false;
+            LOGGER.info("file rotate, instance will restart and offset will be 
clean, file {}",
+                    fileName);
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    protected List<SourceData> readFromSource() {
+        if (queue.isEmpty()) {
+            return null;
+        }
+        int count = 0;
+        int len = 0;
+        List<SourceData> lines = new ArrayList<>();
+        while (!queue.isEmpty() && count < BATCH_READ_LINE_COUNT && len < 
BATCH_READ_LINE_TOTAL_LEN) {
+            if (len + queue.peek().getData().length > 
BATCH_READ_LINE_TOTAL_LEN) {
+                break;
+            }
+            len += queue.peek().getData().length;
+            count++;
+            lines.add(queue.poll());
+        }
+        MemoryManager.getInstance().release(AGENT_GLOBAL_COS_SOURCE_PERMIT, 
len);
+        return lines;
+    }
+
+    @Override
+    protected void printCurrentState() {
+        LOGGER.info("path is {}, linePosition {}, bytePosition is {} file len 
{}", fileName, linePosition,
+                bytePosition, metadata.getContentLength());
+    }
+
+    @Override
+    protected String getThreadName() {
+        return "cos-file-source-" + taskId + "-" + fileName;
+    }
+
+    private void initOffset(String taskId) {
+        long lineOffset;
+        long byteOffset;
+        if (offsetProfile != null) {
+            FileOffset fileOffset = parseFIleOffset(offsetProfile.getOffset());
+            lineOffset = fileOffset.lineOffset;
+            byteOffset = fileOffset.byteOffset;
+            LOGGER.info("initOffset inode no change taskId {} restore 
lineOffset {} byteOffset {}, file {}", taskId,
+                    lineOffset, byteOffset, fileName);
+        } else {
+            lineOffset = 0;
+            byteOffset = 0;
+            LOGGER.info("initOffset taskId {} for new all read lineOffset {} 
byteOffset {} file {}", taskId,
+                    lineOffset, byteOffset, fileName);
+        }
+        linePosition = lineOffset;
+        bytePosition = byteOffset;
+    }
+
+    private Runnable run() {
+        return () -> {
+            AgentThreadFactory.nameThread(getThreadName());
+            running = true;
+            try {
+                doRun();
+            } catch (Throwable e) {
+                LOGGER.error("do run error maybe file deleted: ", e);
+                ThreadUtils.threadThrowableHandler(Thread.currentThread(), e);
+            }
+            running = false;
+        };
+    }
+
+    /**
+     * Read new lines.
+     *
+     * @return The new position after the lines have been read
+     * @throws IOException if an I/O error occurs.
+     */
+    protected void doRun() throws IOException {
+        GetObjectRequest getObjectRequest = new GetObjectRequest(bucketName, 
fileName);
+        getObjectRequest.setRange(bytePosition, metadata.getContentLength());
+        COSObject cosObject = cosClient.getObject(getObjectRequest);
+        InputStream inputStream = cosObject.getObjectContent();
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        int num;
+        boolean overLen = false;
+        while ((num = inputStream.read(bufferToReadFile)) != -1) {
+            LOGGER.debug("read size {}", num);
+            for (int i = 0; i < num; i++) {
+                byte ch = bufferToReadFile[i];
+                bytePosition++;
+                switch (ch) {
+                    case '\n':
+                        linePosition++;
+                        boolean suc = false;
+                        while (isRunnable() && !suc) {
+                            SourceData sourceData = new 
SourceData(baos.toByteArray(),
+                                    getOffsetString(linePosition, 
bytePosition));
+                            boolean suc4Queue = 
waitForPermit(AGENT_GLOBAL_COS_SOURCE_PERMIT,
+                                    sourceData.getData().length);
+                            if (!suc4Queue) {
+                                break;
+                            }
+                            suc = queue.offer(sourceData);
+                            if (!suc) {
+                                MemoryManager.getInstance()
+                                        
.release(AGENT_GLOBAL_COS_SOURCE_PERMIT, sourceData.getData().length);
+                                AgentUtils.silenceSleepInMs(WAIT_TIMEOUT_MS);
+                            }
+                        }
+                        if (overLen) {
+                            LOGGER.warn("readLines over len finally string len 
{}",
+                                    new String(baos.toByteArray()).length());
+                            long auditTime = 0;
+                            auditTime = profile.getSinkDataTime();
+                            
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_FAILED, inlongGroupId, 
inlongStreamId,
+                                    auditTime, 1, maxPackSize, auditVersion);
+                            
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_FAILED_REAL_TIME, inlongGroupId,
+                                    inlongStreamId, 
AgentUtils.getCurrentTime(), 1, maxPackSize, auditVersion);
+                        }
+                        baos.reset();
+                        overLen = false;
+                        break;
+                    case '\r':
+                        break;
+                    default:
+                        if (baos.size() < maxPackSize) {
+                            baos.write(ch);
+                        } else {
+                            overLen = true;
+                        }
+                }
+            }
+        }
+        baos.close();
+        inputStream.close();
+        cosObject.close();
+    }
+
+    private String getOffsetString(Long lineOffset, Long byteOffset) {
+        return lineOffset + OFFSET_SEP + byteOffset;
+    }
+
+    private FileOffset parseFIleOffset(String offset) {
+        String[] offsetArray = offset.split(OFFSET_SEP);
+        if (offsetArray.length == LEN_OF_FILE_OFFSET_ARRAY) {
+            return new FileOffset(Long.parseLong(offsetArray[0]), 
Long.parseLong(offsetArray[1]), true);
+        } else {
+            return new FileOffset(Long.parseLong(offsetArray[0]), null, false);
+        }
+    }
+
+    @Override
+    protected boolean isRunnable() {
+        return runnable && fileExist;
+    }
+
+    @Override
+    public boolean sourceExist() {
+        return fileExist;
+    }
+
+    @Override
+    protected void releaseSource() {
+        while (running) {
+            AgentUtils.silenceSleepInMs(1);
+        }
+        if (cosClient != null) {
+            FileStatic data = new FileStatic();
+            data.setTaskId(taskId);
+            data.setRetry(String.valueOf(profile.isRetry()));
+            data.setContentType(profile.get(COS_CONTENT_STYLE));
+            data.setGroupId(profile.getInlongGroupId());
+            data.setStreamId(profile.getInlongStreamId());
+            data.setDataTime(format.format(profile.getSinkDataTime()));
+            data.setFileName(profile.getInstanceId());
+            data.setFileLen(String.valueOf(metadata.getContentLength()));
+            data.setReadBytes(String.valueOf(bytePosition));
+            data.setReadLines(String.valueOf(linePosition));
+            OffsetProfile offsetProfile = 
OffsetManager.getInstance().getOffset(taskId, instanceId);
+            if (offsetProfile != null) {
+                data.setSendLines(offsetProfile.getOffset());
+            }
+            FileStaticManager.putStaticMsg(data);
+            cosClient.shutdown();
+        }
+        while (!queue.isEmpty()) {
+            
MemoryManager.getInstance().release(AGENT_GLOBAL_COS_SOURCE_PERMIT, 
queue.poll().getData().length);
+        }
+    }
+
+    private boolean waitForPermit(String permitName, int permitLen) {
+        boolean suc = false;
+        while (!suc) {
+            suc = MemoryManager.getInstance().tryAcquire(permitName, 
permitLen);
+            if (!suc) {
+                MemoryManager.getInstance().printDetail(permitName, 
"cos_source");
+                if (!isRunnable()) {
+                    return false;
+                }
+                AgentUtils.silenceSleepInMs(WAIT_TIMEOUT_MS);
+            }
+        }
+        return true;
+    }
+}
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index 300c16168f..ecaa53196b 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -71,7 +71,7 @@ public class LogFileSource extends AbstractSource {
     public static final String OFFSET_SEP = ":";
     private final Integer SIZE_OF_BUFFER_TO_READ_FILE = 64 * 1024;
     private final Long INODE_UPDATE_INTERVAL_MS = 1000L;
-    private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss"); // 设置格式
+    private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss");
 
     private String fileName;
     private File file;
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
index 3ce1378fa7..72d14327a2 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
@@ -165,7 +165,7 @@ public abstract class AbstractSource implements Source {
                 break;
             }
             List<SourceData> lines = readFromSource();
-            if (lines != null && lines.isEmpty()) {
+            if (lines == null || lines.isEmpty()) {
                 if (queue.isEmpty()) {
                     emptyCount++;
                 } else {
@@ -176,14 +176,12 @@ public abstract class AbstractSource implements Source {
                 continue;
             }
             emptyCount = 0;
-            if (lines != null) {
-                for (int i = 0; i < lines.size(); i++) {
-                    boolean suc4Queue = 
waitForPermit(AGENT_GLOBAL_READER_QUEUE_PERMIT, lines.get(i).getData().length);
-                    if (!suc4Queue) {
-                        break;
-                    }
-                    putIntoQueue(lines.get(i));
+            for (int i = 0; i < lines.size(); i++) {
+                boolean suc4Queue = 
waitForPermit(AGENT_GLOBAL_READER_QUEUE_PERMIT, lines.get(i).getData().length);
+                if (!suc4Queue) {
+                    break;
                 }
+                putIntoQueue(lines.get(i));
             }
             
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
BATCH_READ_LINE_TOTAL_LEN);
             if (AgentUtils.getCurrentTime() - lastPrintTime > 
CORE_THREAD_PRINT_INTERVAL_MS) {
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/COSTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/COSTask.java
new file mode 100644
index 0000000000..eceb38e8be
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/COSTask.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.task.cos;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.core.task.TaskAction;
+import org.apache.inlong.agent.plugin.task.AbstractTask;
+import org.apache.inlong.agent.plugin.task.cos.FileScanner.BasicFileInfo;
+import org.apache.inlong.agent.plugin.utils.cos.COSUtils;
+import org.apache.inlong.agent.plugin.utils.regex.NewDateUtils;
+import org.apache.inlong.agent.plugin.utils.regex.Scanner;
+import org.apache.inlong.agent.state.State;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.DateTransUtils;
+
+import com.qcloud.cos.COSClient;
+import com.qcloud.cos.model.ObjectMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.TreeSet;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Watch directory, if new valid files are created, create instance 
correspondingly.
+ */
+public class COSTask extends AbstractTask {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(COSTask.class);
+    public static final String DEFAULT_COS_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.COSInstance";
+    private static final int INSTANCE_QUEUE_CAPACITY = 10;
+    private final Map<String/* dataTime */, Map<String/* fileName */, 
InstanceProfile>> eventMap =
+            new ConcurrentHashMap<>();
+    public static final long DAY_TIMEOUT_INTERVAL = 2 * 24 * 3600 * 1000;
+    private boolean retry;
+    private long startTime;
+    private long endTime;
+    private String originPattern;
+    private long lastScanTime = 0;
+    public final long SCAN_INTERVAL = 1 * 60 * 1000;
+    private volatile boolean runAtLeastOneTime = false;
+    private BlockingQueue<InstanceProfile> instanceQueue;
+    private COSClient cosClient;
+    private String bucketName;
+    private String secretId;
+    private String secretKey;
+    private String strRegion;
+    private String timeOffset = "";
+
+    @Override
+    protected int getInstanceLimit() {
+        return taskProfile.getInt(TaskConstants.COS_MAX_NUM);
+    }
+
+    @Override
+    protected void initTask() {
+        timeOffset = taskProfile.get(TaskConstants.TASK_COS_TIME_OFFSET, "");
+        instanceQueue = new LinkedBlockingQueue<>(INSTANCE_QUEUE_CAPACITY);
+        retry = taskProfile.getBoolean(TaskConstants.COS_TASK_RETRY, false);
+        originPattern = taskProfile.get(TaskConstants.COS_TASK_PATTERN);
+        bucketName = taskProfile.get(TaskConstants.COS_TASK_BUCKET_NAME);
+        secretId = taskProfile.get(TaskConstants.COS_TASK_SECRET_ID);
+        secretKey = taskProfile.get(TaskConstants.COS_TASK_SECRET_KEY);
+        strRegion = taskProfile.get(TaskConstants.COS_TASK_REGION);
+        cosClient = COSUtils.createCli(secretId, secretKey, strRegion);
+        if (retry) {
+            initRetryTask(taskProfile);
+        }
+    }
+
+    private boolean initRetryTask(TaskProfile profile) {
+        String dataTimeFrom = profile.get(TaskConstants.COS_TASK_TIME_FROM, 
"");
+        String dataTimeTo = profile.get(TaskConstants.COS_TASK_TIME_TO, "");
+        try {
+            startTime = DateTransUtils.timeStrConvertToMillSec(dataTimeFrom, 
profile.getCycleUnit());
+            endTime = DateTransUtils.timeStrConvertToMillSec(dataTimeTo, 
profile.getCycleUnit());
+        } catch (ParseException e) {
+            LOGGER.error("retry task time error start {} end {}", 
dataTimeFrom, dataTimeTo, e);
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    protected List<InstanceProfile> getNewInstanceList() {
+        if (retry) {
+            runForRetry();
+        } else {
+            runForNormal();
+        }
+        List<InstanceProfile> list = new ArrayList<>();
+        while (list.size() < INSTANCE_QUEUE_CAPACITY && 
!instanceQueue.isEmpty()) {
+            InstanceProfile profile = instanceQueue.poll();
+            if (profile != null) {
+                list.add(profile);
+            }
+        }
+        return list;
+    }
+
+    @Override
+    public boolean isProfileValid(TaskProfile profile) {
+        if (!profile.allRequiredKeyExist()) {
+            LOGGER.error("task profile needs all required key");
+            return false;
+        }
+        if (!profile.hasKey(TaskConstants.COS_TASK_CYCLE_UNIT)) {
+            LOGGER.error("task profile needs cos cycle unit");
+            return false;
+        }
+        if (!profile.hasKey(TaskConstants.TASK_CYCLE_UNIT)) {
+            LOGGER.error("task profile needs cycle unit");
+            return false;
+        }
+        if (profile.get(TaskConstants.TASK_CYCLE_UNIT)
+                .compareTo(profile.get(TaskConstants.COS_TASK_CYCLE_UNIT)) != 
0) {
+            LOGGER.error("task profile cycle unit must be consistent");
+            return false;
+        }
+        if (!profile.hasKey(TaskConstants.TASK_TIME_ZONE)) {
+            LOGGER.error("task profile needs time zone");
+            return false;
+        }
+        boolean ret = profile.hasKey(TaskConstants.COS_TASK_PATTERN)
+                && profile.hasKey(TaskConstants.COS_MAX_NUM);
+        if (!ret) {
+            LOGGER.error("task profile needs file keys");
+            return false;
+        }
+        if (!profile.hasKey(TaskConstants.TASK_COS_TIME_OFFSET)) {
+            LOGGER.error("task profile needs time offset");
+            return false;
+        }
+        if (profile.getBoolean(TaskConstants.COS_TASK_RETRY, false)) {
+            if (!initRetryTask(profile)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    protected void releaseTask() {
+        cosClient.shutdown();
+    }
+
+    private void runForRetry() {
+        if (!runAtLeastOneTime) {
+            scanExistingFile();
+            runAtLeastOneTime = true;
+        }
+        dealWithEventMap();
+        if (allInstanceFinished()) {
+            LOGGER.info("retry task finished, send action to task manager, 
taskId {}", getTaskId());
+            TaskAction action = new 
TaskAction(org.apache.inlong.agent.core.task.ActionType.FINISH, taskProfile);
+            taskManager.submitAction(action);
+            doChangeState(State.SUCCEEDED);
+        }
+    }
+
+    private void runForNormal() {
+        if (AgentUtils.getCurrentTime() - lastScanTime > SCAN_INTERVAL) {
+            scanExistingFile();
+            lastScanTime = AgentUtils.getCurrentTime();
+        }
+        dealWithEventMap();
+    }
+
+    private void scanExistingFile() {
+        LOGGER.info("test123 qqqq");
+        List<BasicFileInfo> fileInfos = 
FileScanner.scanTaskBetweenTimes(cosClient, bucketName, originPattern,
+                taskProfile.getCycleUnit(), timeOffset, startTime, endTime, 
retry);
+        LOGGER.info("taskId {} scan {} get file count {}", getTaskId(), 
originPattern, fileInfos.size());
+        fileInfos.forEach((fileInfo) -> {
+            addToEvenMap(fileInfo.fileName, fileInfo.dataTime);
+            if (retry) {
+                instanceCount++;
+            }
+        });
+    }
+
+    private boolean isInEventMap(String fileName, String dataTime) {
+        Map<String, InstanceProfile> fileToProfile = eventMap.get(dataTime);
+        if (fileToProfile == null) {
+            return false;
+        }
+        return fileToProfile.get(fileName) != null;
+    }
+
+    private void dealWithEventMap() {
+        removeTimeoutEvent(eventMap, retry);
+        dealWithEventMapWithCycle();
+    }
+
+    private void dealWithEventMapWithCycle() {
+        long startScanTime = startTime;
+        long endScanTime = endTime;
+        List<String> dataTimeList = Scanner.getDataTimeList(startScanTime, 
endScanTime, taskProfile.getCycleUnit(),
+                timeOffset, retry);
+        if (dataTimeList.isEmpty()) {
+            LOGGER.error("get dataTimeList return empty list");
+            return;
+        }
+        Set<String> dealtDataTime = new HashSet<>();
+        // normal task first handle current data time
+        if (!retry) {
+            String current = dataTimeList.remove(dataTimeList.size() - 1);
+            dealtDataTime.add(current);
+            if (!dealEventMapByDataTime(current, true)) {
+                return;
+            }
+        }
+        dataTimeList.forEach(dataTime -> {
+            dealtDataTime.add(dataTime);
+            if (!dealEventMapByDataTime(dataTime, false)) {
+                return;
+            }
+        });
+        for (String dataTime : eventMap.keySet()) {
+            if (!dealtDataTime.contains(dataTime)) {
+                dealEventMapByDataTime(dataTime, false);
+            }
+        }
+    }
+
+    private boolean dealEventMapByDataTime(String dataTime, boolean 
isCurrentDataTime) {
+        Map<String, InstanceProfile> sameDataTimeEvents = 
eventMap.get(dataTime);
+        if (sameDataTimeEvents == null || sameDataTimeEvents.isEmpty()) {
+            return true;
+        }
+        if (shouldStartNow(dataTime)) {
+            Set<InstanceProfile> sortedEvents = new 
TreeSet<>(Comparator.comparing(InstanceProfile::getInstanceId));
+            sortedEvents.addAll(sameDataTimeEvents.values());
+            for (InstanceProfile sortEvent : sortedEvents) {
+                String fileName = sortEvent.getInstanceId();
+                InstanceProfile profile = sameDataTimeEvents.get(fileName);
+                if (!isCurrentDataTime && isFull()) {
+                    return false;
+                }
+                if (!instanceQueue.offer(profile)) {
+                    return false;
+                }
+                sameDataTimeEvents.remove(fileName);
+            }
+        }
+        return true;
+    }
+
+    /*
+     * Calculate whether the event needs to be processed at the current time 
based on its data time, business cycle, and
+     * offset
+     */
+    private boolean shouldStartNow(String dataTime) {
+        String shouldStartTime =
+                NewDateUtils.getShouldStartTime(dataTime, 
taskProfile.getCycleUnit(), timeOffset);
+        String currentTime = getCurrentTime();
+        return currentTime.compareTo(shouldStartTime) >= 0;
+    }
+
+    private void removeTimeoutEvent(Map<String, Map<String, InstanceProfile>> 
eventMap, boolean isRetry) {
+        if (isRetry) {
+            return;
+        }
+        for (Map.Entry<String, Map<String, InstanceProfile>> entry : 
eventMap.entrySet()) {
+            /* If the data time of the event is within 2 days before (after) 
the current time, it is valid */
+            String dataTime = entry.getKey();
+            if (!NewDateUtils.isValidCreationTime(dataTime, 
DAY_TIMEOUT_INTERVAL)) {
+                /* Remove it from memory map. */
+                eventMap.remove(dataTime);
+                LOGGER.warn("remove too old event from event map. dataTime 
{}", dataTime);
+            }
+        }
+    }
+
+    private String getCurrentTime() {
+        SimpleDateFormat dateFormat = new 
SimpleDateFormat(NewDateUtils.DEFAULT_FORMAT);
+        TimeZone timeZone = 
TimeZone.getTimeZone(NewDateUtils.DEFAULT_TIME_ZONE);
+        dateFormat.setTimeZone(timeZone);
+        return dateFormat.format(new Date(System.currentTimeMillis()));
+    }
+
+    private void addToEvenMap(String fileName, String dataTime) {
+        if (isInEventMap(fileName, dataTime)) {
+            LOGGER.info("add to evenMap isInEventMap returns true skip taskId 
{} dataTime {} fileName {}",
+                    taskProfile.getTaskId(), dataTime, fileName);
+            return;
+        }
+        LOGGER.info("test123 {}", cosClient);
+        ObjectMetadata meta = cosClient.getObjectMetadata(bucketName, 
fileName);
+        Long fileUpdateTime = meta.getLastModified().getTime();
+        if (!shouldAddAgain(fileName, fileUpdateTime)) {
+            LOGGER.info("add to evenMap shouldAddAgain returns false skip 
taskId {} dataTime {} fileName {}",
+                    taskProfile.getTaskId(), dataTime, fileName);
+            return;
+        }
+        Map<String, InstanceProfile> sameDataTimeEvents = 
eventMap.computeIfAbsent(dataTime,
+                mapKey -> new ConcurrentHashMap<>());
+        boolean containsInMemory = sameDataTimeEvents.containsKey(fileName);
+        if (containsInMemory) {
+            LOGGER.error("should not happen! may be {} has been deleted and 
add again", fileName);
+            return;
+        }
+        String cycleUnit = taskProfile.getCycleUnit();
+        InstanceProfile instanceProfile = 
taskProfile.createInstanceProfile(DEFAULT_COS_INSTANCE,
+                fileName, cycleUnit, dataTime, fileUpdateTime);
+        sameDataTimeEvents.put(fileName, instanceProfile);
+        LOGGER.info("add to eventMap taskId {} dataTime {} fileName {}", 
taskProfile.getTaskId(), dataTime, fileName);
+    }
+}
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/FileScanner.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/FileScanner.java
new file mode 100644
index 0000000000..4eac0eeff2
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/FileScanner.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.task.cos;
+
+import org.apache.inlong.agent.plugin.utils.regex.PatternUtil;
+import org.apache.inlong.agent.plugin.utils.regex.Scanner;
+import org.apache.inlong.agent.plugin.utils.regex.Scanner.FinalPatternInfo;
+import org.apache.inlong.agent.utils.DateTransUtils;
+
+import com.qcloud.cos.COSClient;
+import com.qcloud.cos.exception.CosClientException;
+import com.qcloud.cos.exception.CosServiceException;
+import com.qcloud.cos.model.COSObjectSummary;
+import com.qcloud.cos.model.ListObjectsRequest;
+import com.qcloud.cos.model.ObjectListing;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/*
+ * This class is mainly used for scanning log file that we want to read. We 
use this class at
+ * inlong_agent recover process, the do and redo tasks and the current log 
file access when we deploy a
+ * new data source.
+ */
+public class FileScanner {
+
+    public static final int DEFAULT_KEY_COUNT = 100;
+    public static final String DEFAULT_DELIMITER = "/";
+    public static final char PATH_SEP = '/';
+
+    public static class BasicFileInfo {
+
+        public String fileName;
+        public String dataTime;
+
+        public BasicFileInfo(String fileName, String dataTime) {
+            this.fileName = fileName;
+            this.dataTime = dataTime;
+        }
+    }
+
+    private static final Logger logger = 
LoggerFactory.getLogger(FileScanner.class);
+
+    public static List<BasicFileInfo> scanTaskBetweenTimes(COSClient 
cosClient, String bucketName, String originPattern,
+            String cycleUnit, String timeOffset, long startTime, long endTime, 
boolean isRetry) {
+        List<FinalPatternInfo> finalPatternInfos = 
Scanner.getFinalPatternInfos(originPattern, cycleUnit, timeOffset,
+                startTime, endTime, isRetry);
+        List<BasicFileInfo> infos = new ArrayList<>();
+        for (FinalPatternInfo finalPatternInfo : finalPatternInfos) {
+            String prefix = 
PatternUtil.getBeforeFirstWildcard(finalPatternInfo.finalPattern);
+            Pattern pattern = Pattern.compile(finalPatternInfo.finalPattern,
+                    Pattern.CASE_INSENSITIVE | Pattern.DOTALL | 
Pattern.MULTILINE);
+            List<BasicFileInfo> fileInfos = scanTaskInOneCycle(cosClient, 
bucketName, pattern, prefix,
+                    finalPatternInfo.dataTime, cycleUnit);
+            infos.addAll(fileInfos);
+        }
+        return infos;
+    }
+
+    public static List<BasicFileInfo> scanTaskInOneCycle(COSClient cosClient, 
String bucketName, Pattern pattern,
+            String prefix, Long dataTime, String cycleUnit) {
+        List<BasicFileInfo> infos = new ArrayList<>();
+        ObjectListing objectListing;
+        ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
+        do {
+            try {
+                listObjectsRequest.setBucketName(bucketName);
+                listObjectsRequest.setPrefix(prefix);
+                listObjectsRequest.setDelimiter(DEFAULT_DELIMITER);
+                listObjectsRequest.setMaxKeys(DEFAULT_KEY_COUNT);
+                objectListing = cosClient.listObjects(listObjectsRequest);
+            } catch (CosServiceException e) {
+                logger.error("scanTaskInOneCycle finalPattern {} 
CosServiceException", pattern.pattern(), e);
+                return infos;
+            } catch (CosClientException e) {
+                logger.error("scanTaskInOneCycle finalPattern {} 
CosClientException", pattern.pattern(), e);
+                return infos;
+            }
+            List<String> commonPrefixes = objectListing.getCommonPrefixes();
+            int depth;
+            Pattern patternByDepth = null;
+            if (!commonPrefixes.isEmpty()) {
+                depth = countCharacterOccurrences(commonPrefixes.get(0), 
PATH_SEP);
+                String temp = findNthOccurrenceSubstring(pattern.pattern(), 
PATH_SEP, depth);
+                patternByDepth = Pattern.compile(temp, 
Pattern.CASE_INSENSITIVE | Pattern.DOTALL | Pattern.MULTILINE);
+            }
+            for (String commonPrefix : commonPrefixes) {
+                Matcher matcher = patternByDepth.matcher(commonPrefix);
+                if (matcher.matches()) {
+                    infos.addAll(scanTaskInOneCycle(cosClient, bucketName, 
pattern, commonPrefix, dataTime, cycleUnit));
+                }
+            }
+            List<COSObjectSummary> cosObjectSummaries = 
objectListing.getObjectSummaries();
+            for (COSObjectSummary cosObjectSummary : cosObjectSummaries) {
+                String key = cosObjectSummary.getKey();
+                Matcher matcher = pattern.matcher(key);
+                if (matcher.lookingAt()) {
+                    long fileSize = cosObjectSummary.getSize();
+                    String storageClasses = cosObjectSummary.getStorageClass();
+                    infos.add(new BasicFileInfo(key,
+                            DateTransUtils.millSecConvertToTimeStr(dataTime, 
cycleUnit)));
+                    String strDataTime = 
DateTransUtils.millSecConvertToTimeStr(dataTime, cycleUnit);
+                    logger.info("list key {} dataTime {} size {}, 
storageClasses {}", key, strDataTime, fileSize,
+                            storageClasses);
+                }
+            }
+            String nextMarker = objectListing.getNextMarker();
+            listObjectsRequest.setMarker(nextMarker);
+        } while (objectListing.isTruncated());
+        return infos;
+    }
+
+    public static int countCharacterOccurrences(String input, char targetChar) 
{
+        if (input == null) {
+            throw new IllegalArgumentException("Input string cannot be null");
+        }
+        int count = 0;
+
+        for (int i = 0; i < input.length(); i++) {
+            if (input.charAt(i) == targetChar) {
+                count++;
+            }
+        }
+        return count;
+    }
+
+    public static String findNthOccurrenceSubstring(String input, char 
targetChar, int n) {
+        int endIndex = findNthOccurrence(input, targetChar, n);
+        if (endIndex != -1) {
+            return input.substring(0, endIndex + 1);
+        } else {
+            return null;
+        }
+    }
+
+    public static int findNthOccurrence(String input, char targetChar, int n) {
+        int currentIndex = -1;
+        for (int i = 0; i < n; i++) {
+            currentIndex = input.indexOf(targetChar, currentIndex + 1);
+            if (currentIndex == -1) {
+                return -1;
+            }
+        }
+        return currentIndex;
+    }
+}
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/cos/COSConfigHandler.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/cos/COSConfigHandler.java
new file mode 100644
index 0000000000..259d330776
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/cos/COSConfigHandler.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.utils.cos;
+
+import com.qcloud.cos.ClientConfig;
+
+// For some private, customized extension processing
+public abstract class COSConfigHandler {
+
+    abstract public ClientConfig getClientConfig(String region);
+}
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/cos/COSUtils.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/cos/COSUtils.java
new file mode 100644
index 0000000000..111e94212f
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/cos/COSUtils.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.utils.cos;
+
+import com.qcloud.cos.COSClient;
+import com.qcloud.cos.ClientConfig;
+import com.qcloud.cos.auth.BasicCOSCredentials;
+import com.qcloud.cos.auth.COSCredentials;
+
+public class COSUtils {
+
+    public static COSClient createCli(String secretId, String secretKey, 
String region) {
+        COSCredentials cred = new BasicCOSCredentials(secretId, secretKey);
+        COSConfigHandler configHandler = new DefaultCOSConfigHandler();
+        ClientConfig clientConfig = configHandler.getClientConfig(region);
+        return new COSClient(cred, clientConfig);
+    }
+}
\ No newline at end of file
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/cos/DefaultCOSConfigHandler.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/cos/DefaultCOSConfigHandler.java
new file mode 100644
index 0000000000..a9dda478a2
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/cos/DefaultCOSConfigHandler.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.utils.cos;
+
+import com.qcloud.cos.ClientConfig;
+import com.qcloud.cos.region.Region;
+
+public class DefaultCOSConfigHandler extends COSConfigHandler {
+
+    public ClientConfig getClientConfig(String region) {
+        return new ClientConfig(new Region(region));
+    }
+}
diff --git a/licenses/inlong-agent/LICENSE b/licenses/inlong-agent/LICENSE
index 33a51c916a..cd2f529072 100644
--- a/licenses/inlong-agent/LICENSE
+++ b/licenses/inlong-agent/LICENSE
@@ -541,6 +541,7 @@ MIT licenses
 The following components are provided under MIT license. See project link for 
details.
 The text of each license is also included at licenses/LICENSE-[project].txt.
 
+  com.qcloud:cos_api:jar:5.6.54 - cos-java-sdk 
(https://github.com/tencentyun/cos-java-sdk-v5), (MIT License)
   net.sourceforge.argparse4j:argparse4j:0.7.0 - argparse4j 
(https://github.com/argparse4j/argparse4j/tree/argparse4j-0.7.0), (MIT)
   org.bouncycastle:bcpkix-jdk15on:1.69 - Bouncy Castle PKIX, CMS, EAC, TSP, 
PKCS, OCSP, CMP, and CRMF APIs (https://www.bouncycastle.org/java.html), (MIT 
License)
   org.bouncycastle:bcprov-ext-jdk15on:1.69 - Bouncy Castle Provider 
(https://www.bouncycastle.org/java.html), (MIT License)
diff --git a/licenses/inlong-agent/licenses/LICENSE-cos-java-sdk.txt 
b/licenses/inlong-agent/licenses/LICENSE-cos-java-sdk.txt
new file mode 100644
index 0000000000..4b32321f98
--- /dev/null
+++ b/licenses/inlong-agent/licenses/LICENSE-cos-java-sdk.txt
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2017 腾讯云
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 4330360033..0562af4dd7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -190,6 +190,7 @@
         <HikariCP.version>4.0.3</HikariCP.version>
         <caffeine.version>2.9.3</caffeine.version>
         <kafka.clients.version>3.0.0</kafka.clients.version>
+        <cos.sdk.version>5.6.54</cos.sdk.version>
         <paho.client.version>1.2.5</paho.client.version>
 
         <kubernetes.client.version>6.0.0</kubernetes.client.version>

Reply via email to