This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git

The following commit(s) were added to refs/heads/master by this push:
     new 4d7838415b [INLONG-11574][Agent] Add COS source unit test (#11575)
4d7838415b is described below

commit 4d7838415b981d37e0ef1c1bceac8fdd4d9b8245
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Fri Dec 6 09:48:06 2024 +0800

    [INLONG-11574][Agent] Add COS source unit test (#11575)
---
 .../inlong/agent/core/AgentBaseTestsHelper.java    |  33 ---
 .../inlong/agent/plugin/task/cos/COSTask.java      |   2 -
 .../inlong/agent/plugin/task/cos/FileScanner.java  |  20 +-
 .../inlong/agent/plugin/AgentBaseTestsHelper.java  |  53 ++++-
 .../agent/plugin/instance/TestInstanceManager.java |   5 +-
 .../inlong/agent/plugin/sinks/KafkaSinkTest.java   |   5 +-
 .../inlong/agent/plugin/sinks/PulsarSinkTest.java  |   5 +-
 .../sinks/filecollect/TestSenderManager.java       |   5 +-
 .../agent/plugin/sources/TestLogFileSource.java    |   2 +-
 .../agent/plugin/sources/TestRedisSource.java      |   2 +-
 .../agent/plugin/sources/TestSQLServerSource.java  |   2 +-
 .../inlong/agent/plugin/task/TestCOSTask.java      | 233 +++++++++++++++++++++
 .../inlong/agent/plugin/task/TestLogFileTask.java  |   8 +-
 .../inlong/agent/plugin/task/TestTaskManager.java  |   9 +-
 14 files changed, 317 insertions(+), 67 deletions(-)

diff --git 
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java
 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java
index a8dfbdf91a..af0a54f129 100755
--- 
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java
+++ 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java
@@ -18,12 +18,8 @@
 package org.apache.inlong.agent.core;
 
 import org.apache.inlong.agent.conf.AgentConfiguration;
-import org.apache.inlong.agent.conf.TaskProfile;
 import org.apache.inlong.agent.constant.AgentConstants;
 import org.apache.inlong.agent.constant.FetcherConstants;
-import org.apache.inlong.agent.pojo.FileTask.FileTaskConfig;
-import org.apache.inlong.common.enums.TaskStateEnum;
-import org.apache.inlong.common.pojo.agent.DataConfig;
 
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
@@ -74,33 +70,4 @@ public class AgentBaseTestsHelper {
             }
         }
     }
-
-    public TaskProfile getTaskProfile(int taskId, String pattern, boolean 
retry, String startTime, String endTime,
-            TaskStateEnum state, String timeZone) {
-        DataConfig dataConfig = getDataConfig(taskId, pattern, retry, 
startTime, endTime, state, timeZone);
-        TaskProfile profile = TaskProfile.convertToTaskProfile(dataConfig);
-        return profile;
-    }
-
-    private DataConfig getDataConfig(int taskId, String pattern, boolean 
retry, String startTime, String endTime,
-            TaskStateEnum state, String timeZone) {
-        DataConfig dataConfig = new DataConfig();
-        dataConfig.setInlongGroupId("testGroupId");
-        dataConfig.setInlongStreamId("testStreamId");
-        dataConfig.setDataReportType(1);
-        dataConfig.setTaskType(3);
-        dataConfig.setTaskId(taskId);
-        dataConfig.setTimeZone(timeZone);
-        dataConfig.setState(state.ordinal());
-        FileTaskConfig fileTaskConfig = new FileTaskConfig();
-        fileTaskConfig.setPattern(pattern);
-        fileTaskConfig.setTimeOffset("0h");
-        fileTaskConfig.setMaxFileCount(100);
-        fileTaskConfig.setCycleUnit("h");
-        fileTaskConfig.setRetry(retry);
-        fileTaskConfig.setDataTimeFrom(startTime);
-        fileTaskConfig.setDataTimeTo(endTime);
-        dataConfig.setExtParams(GSON.toJson(fileTaskConfig));
-        return dataConfig;
-    }
 }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/COSTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/COSTask.java
index eceb38e8be..de2beb7cc0 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/COSTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/COSTask.java
@@ -196,7 +196,6 @@ public class COSTask extends AbstractTask {
     }
 
     private void scanExistingFile() {
-        LOGGER.info("test123 qqqq");
         List<BasicFileInfo> fileInfos = 
FileScanner.scanTaskBetweenTimes(cosClient, bucketName, originPattern,
                 taskProfile.getCycleUnit(), timeOffset, startTime, endTime, 
retry);
         LOGGER.info("taskId {} scan {} get file count {}", getTaskId(), 
originPattern, fileInfos.size());
@@ -314,7 +313,6 @@ public class COSTask extends AbstractTask {
                     taskProfile.getTaskId(), dataTime, fileName);
             return;
         }
-        LOGGER.info("test123 {}", cosClient);
         ObjectMetadata meta = cosClient.getObjectMetadata(bucketName, 
fileName);
         Long fileUpdateTime = meta.getLastModified().getTime();
         if (!shouldAddAgain(fileName, fileUpdateTime)) {
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/FileScanner.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/FileScanner.java
index 4eac0eeff2..1019d34135 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/FileScanner.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/FileScanner.java
@@ -97,16 +97,20 @@ public class FileScanner {
             }
             List<String> commonPrefixes = objectListing.getCommonPrefixes();
             int depth;
-            Pattern patternByDepth = null;
+            Pattern patternByDepth;
             if (!commonPrefixes.isEmpty()) {
                 depth = countCharacterOccurrences(commonPrefixes.get(0), 
PATH_SEP);
-                String temp = findNthOccurrenceSubstring(pattern.pattern(), 
PATH_SEP, depth);
-                patternByDepth = Pattern.compile(temp, 
Pattern.CASE_INSENSITIVE | Pattern.DOTALL | Pattern.MULTILINE);
-            }
-            for (String commonPrefix : commonPrefixes) {
-                Matcher matcher = patternByDepth.matcher(commonPrefix);
-                if (matcher.matches()) {
-                    infos.addAll(scanTaskInOneCycle(cosClient, bucketName, 
pattern, commonPrefix, dataTime, cycleUnit));
+                String nthOccurrenceSubstring = 
findNthOccurrenceSubstring(pattern.pattern(), PATH_SEP, depth);
+                if (nthOccurrenceSubstring != null) {
+                    patternByDepth = Pattern.compile(nthOccurrenceSubstring,
+                            Pattern.CASE_INSENSITIVE | Pattern.DOTALL | 
Pattern.MULTILINE);
+                    for (String commonPrefix : commonPrefixes) {
+                        Matcher matcher = patternByDepth.matcher(commonPrefix);
+                        if (matcher.matches()) {
+                            infos.addAll(scanTaskInOneCycle(cosClient, 
bucketName, pattern, commonPrefix, dataTime,
+                                    cycleUnit));
+                        }
+                    }
                 }
             }
             List<COSObjectSummary> cosObjectSummaries = 
objectListing.getObjectSummaries();
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
index 3dc4f8ab15..214a29b24e 100755
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
@@ -21,8 +21,10 @@ import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.conf.TaskProfile;
 import org.apache.inlong.agent.constant.AgentConstants;
 import org.apache.inlong.agent.constant.FetcherConstants;
+import org.apache.inlong.agent.pojo.COSTask.COSTaskConfig;
 import org.apache.inlong.agent.pojo.FileTask.FileTaskConfig;
 import org.apache.inlong.common.enums.TaskStateEnum;
+import org.apache.inlong.common.enums.TaskTypeEnum;
 import org.apache.inlong.common.pojo.agent.DataConfig;
 
 import com.google.gson.Gson;
@@ -82,24 +84,24 @@ public class AgentBaseTestsHelper {
         }
     }
 
-    public TaskProfile getTaskProfile(int taskId, String pattern, String 
dataContentStyle, boolean retry,
+    public TaskProfile getFileTaskProfile(int taskId, String pattern, String 
dataContentStyle, boolean retry,
             String startTime, String endTime,
             TaskStateEnum state, String cycleUnit, String timeZone, 
List<String> filterStreams) {
-        DataConfig dataConfig = getDataConfig(taskId, pattern, 
dataContentStyle, retry, startTime, endTime,
+        DataConfig dataConfig = getFileDataConfig(taskId, pattern, 
dataContentStyle, retry, startTime, endTime,
                 state, cycleUnit, timeZone,
                 filterStreams);
         TaskProfile profile = TaskProfile.convertToTaskProfile(dataConfig);
         return profile;
     }
 
-    private DataConfig getDataConfig(int taskId, String pattern, String 
dataContentStyle, boolean retry,
+    private DataConfig getFileDataConfig(int taskId, String pattern, String 
dataContentStyle, boolean retry,
             String startTime, String endTime, TaskStateEnum state, String 
cycleUnit, String timeZone,
             List<String> filterStreams) {
         DataConfig dataConfig = new DataConfig();
         dataConfig.setInlongGroupId("testGroupId");
         dataConfig.setInlongStreamId("testStreamId");
         dataConfig.setDataReportType(1);
-        dataConfig.setTaskType(3);
+        dataConfig.setTaskType(TaskTypeEnum.FILE.getType());
         dataConfig.setTaskId(taskId);
         dataConfig.setTimeZone(timeZone);
         dataConfig.setState(state.ordinal());
@@ -119,4 +121,47 @@ public class AgentBaseTestsHelper {
         dataConfig.setExtParams(GSON.toJson(fileTaskConfig));
         return dataConfig;
     }
+
+    public TaskProfile getCOSTaskProfile(int taskId, String pattern, String 
contentStyle, boolean retry,
+            String startTime, String endTime,
+            TaskStateEnum state, String cycleUnit, String timeZone, 
List<String> filterStreams) {
+        DataConfig dataConfig = getCOSDataConfig(taskId, pattern, 
contentStyle, retry, startTime, endTime,
+                state, cycleUnit, timeZone,
+                filterStreams);
+        TaskProfile profile = TaskProfile.convertToTaskProfile(dataConfig);
+        return profile;
+    }
+
+    private DataConfig getCOSDataConfig(int taskId, String pattern, String 
contentStyle, boolean retry,
+            String startTime, String endTime, TaskStateEnum state, String 
cycleUnit, String timeZone,
+            List<String> filterStreams) {
+        DataConfig dataConfig = new DataConfig();
+        dataConfig.setInlongGroupId("testGroupId");
+        dataConfig.setInlongStreamId("testStreamId");
+        dataConfig.setDataReportType(1);
+        dataConfig.setTaskType(TaskTypeEnum.COS.getType());
+        dataConfig.setTaskId(taskId);
+        dataConfig.setTimeZone(timeZone);
+        dataConfig.setState(state.ordinal());
+        COSTaskConfig cosTaskConfig = new COSTaskConfig();
+        cosTaskConfig.setBucketName("testBucket");
+        cosTaskConfig.setCredentialsId("testSecretId");
+        cosTaskConfig.setCredentialsKey("testSecretKey");
+        cosTaskConfig.setRegion("testRegion");
+        cosTaskConfig.setPattern(pattern);
+        cosTaskConfig.setTimeOffset("0d");
+        // GMT-8:00 same with Asia/Shanghai
+        cosTaskConfig.setMaxFileCount(100);
+        cosTaskConfig.setCycleUnit(cycleUnit);
+        cosTaskConfig.setRetry(retry);
+        cosTaskConfig.setDataTimeFrom(startTime);
+        cosTaskConfig.setDataTimeTo(endTime);
+        // mix: login|87601|968|67826|23579 or login|a=b&c=d&x=y&asdf
+        cosTaskConfig.setContentStyle(contentStyle);
+        cosTaskConfig.setDataSeparator("|");
+        cosTaskConfig.setFilterStreams(filterStreams);
+        dataConfig.setExtParams(GSON.toJson(cosTaskConfig));
+        return dataConfig;
+    }
+
 }
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java
index 61c94c4dd1..9901f29023 100755
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java
@@ -59,8 +59,9 @@ public class TestInstanceManager {
         helper = new 
AgentBaseTestsHelper(TestInstanceManager.class.getName()).setupAgentHome();
         String pattern = helper.getTestRootDir() + "/YYYYMMDDhh_[0-9]+.txt";
         Store basicInstanceStore = 
TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_INSTANCE);
-        taskProfile = helper.getTaskProfile(1, pattern, "csv", false, "", "", 
TaskStateEnum.RUNNING, CycleUnitType.HOUR,
-                "GMT+6:00", null);
+        taskProfile =
+                helper.getFileTaskProfile(1, pattern, "csv", false, "", "", 
TaskStateEnum.RUNNING, CycleUnitType.HOUR,
+                        "GMT+6:00", null);
         Store taskBasicStore = 
TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_TASK);
         TaskStore taskStore = new TaskStore(taskBasicStore);
         taskStore.storeTask(taskProfile);
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java
index 066776d32f..b8703c7056 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java
@@ -47,8 +47,9 @@ public class KafkaSinkTest {
         String fileName = LOADER.getResource("test/20230928_1.txt").getPath();
         helper = new 
AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome();
         String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
-        TaskProfile taskProfile = helper.getTaskProfile(1, pattern, "csv", 
false, "", "", TaskStateEnum.RUNNING, "D",
-                "GMT+8:00", null);
+        TaskProfile taskProfile =
+                helper.getFileTaskProfile(1, pattern, "csv", false, "", "", 
TaskStateEnum.RUNNING, "D",
+                        "GMT+8:00", null);
         profile = taskProfile.createInstanceProfile("", fileName,
                 taskProfile.getCycleUnit(), "20230927", 
AgentUtils.getCurrentTime());
         kafkaSink = new MockSink();
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
index d7733259ab..43e3115dcb 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
@@ -47,8 +47,9 @@ public class PulsarSinkTest {
         String fileName = LOADER.getResource("test/20230928_1.txt").getPath();
         helper = new 
AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome();
         String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
-        TaskProfile taskProfile = helper.getTaskProfile(1, pattern, "csv", 
false, "", "", TaskStateEnum.RUNNING, "D",
-                "GMT+8:00", null);
+        TaskProfile taskProfile =
+                helper.getFileTaskProfile(1, pattern, "csv", false, "", "", 
TaskStateEnum.RUNNING, "D",
+                        "GMT+8:00", null);
         profile = taskProfile.createInstanceProfile("", fileName,
                 taskProfile.getCycleUnit(), "20230927", 
AgentUtils.getCurrentTime());
         pulsarSink = new MockSink();
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
index 5a1168edef..1c9e623b9b 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
@@ -70,8 +70,9 @@ public class TestSenderManager {
         String fileName = LOADER.getResource("test/20230928_1.txt").getPath();
         helper = new 
AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome();
         String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
-        TaskProfile taskProfile = helper.getTaskProfile(1, pattern, "csv", 
false, "", "", TaskStateEnum.RUNNING, "D",
-                "GMT+8:00", null);
+        TaskProfile taskProfile =
+                helper.getFileTaskProfile(1, pattern, "csv", false, "", "", 
TaskStateEnum.RUNNING, "D",
+                        "GMT+8:00", null);
         profile = taskProfile.createInstanceProfile("", fileName,
                 taskProfile.getCycleUnit(), "20230927", 
AgentUtils.getCurrentTime());
     }
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
index 408b9f1b70..d7a93a0df8 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
@@ -84,7 +84,7 @@ public class TestLogFileSource {
             fileName = LOADER.getResource("test/20230928_1.txt").getPath();
             pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
             retry = false;
-            TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern, 
dataContentStyle, retry, "", "",
+            TaskProfile taskProfile = helper.getFileTaskProfile(taskId, 
pattern, dataContentStyle, retry, "", "",
                     TaskStateEnum.RUNNING, "D",
                     "GMT+8:00", Arrays.asList("ok"));
             InstanceProfile instanceProfile = 
taskProfile.createInstanceProfile("",
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java
index 4f2e90870b..14518078f2 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java
@@ -121,7 +121,7 @@ public class TestRedisSource {
         final String command = "zscore";
         final String subOperation = "set,del";
 
-        TaskProfile taskProfile = helper.getTaskProfile(1, "", "csv", false, 
"", "", TaskStateEnum.RUNNING, "D",
+        TaskProfile taskProfile = helper.getFileTaskProfile(1, "", "csv", 
false, "", "", TaskStateEnum.RUNNING, "D",
                 "GMT+8:00", null);
         profile = taskProfile.createInstanceProfile("",
                 "", taskProfile.getCycleUnit(), "20240725", 
AgentUtils.getCurrentTime());
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
index 377e5ae913..410acd8e77 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
@@ -136,7 +136,7 @@ public class TestSQLServerSource {
         final String tableName = "test_source";
         final String serverName = "server-01";
 
-        TaskProfile taskProfile = helper.getTaskProfile(1, "", "csv", false, 
"", "", TaskStateEnum.RUNNING, "D",
+        TaskProfile taskProfile = helper.getFileTaskProfile(1, "", "csv", 
false, "", "", TaskStateEnum.RUNNING, "D",
                 "GMT+8:00", null);
         instanceProfile = taskProfile.createInstanceProfile("",
                 "", taskProfile.getCycleUnit(), "20240725", 
AgentUtils.getCurrentTime());
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestCOSTask.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestCOSTask.java
new file mode 100644
index 0000000000..f66e4845b1
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestCOSTask.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.task;
+
+import org.apache.inlong.agent.common.AgentThreadFactory;
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.CycleUnitType;
+import org.apache.inlong.agent.core.task.TaskManager;
+import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
+import org.apache.inlong.agent.plugin.task.cos.COSTask;
+import org.apache.inlong.agent.plugin.utils.cos.COSUtils;
+import org.apache.inlong.common.enums.TaskStateEnum;
+
+import com.qcloud.cos.COSClient;
+import com.qcloud.cos.model.COSObjectSummary;
+import com.qcloud.cos.model.ListObjectsRequest;
+import com.qcloud.cos.model.ObjectListing;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.await;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({COSUtils.class, COSTask.class, COSClient.class, 
ObjectListing.class})
+@PowerMockIgnore({"javax.management.*"})
+public class TestCOSTask {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(TestCOSTask.class);
+    private static final ClassLoader LOADER = 
TestCOSTask.class.getClassLoader();
+    private static AgentBaseTestsHelper helper;
+    private static TaskManager manager;
+    private static final ThreadPoolExecutor EXECUTOR_SERVICE = new 
ThreadPoolExecutor(
+            0, Integer.MAX_VALUE,
+            1L, TimeUnit.SECONDS,
+            new SynchronousQueue<>(),
+            new AgentThreadFactory("TestCOSTask"));
+    private static COSClient cosClient;
+
+    @BeforeClass
+    public static void setup() throws Exception {
+        helper = new 
AgentBaseTestsHelper(TestCOSTask.class.getName()).setupAgentHome();
+        manager = new TaskManager();
+        cosClient = Mockito.mock(COSClient.class);
+        PowerMockito.mockStatic(COSUtils.class);
+        Mockito.when(COSUtils.createCli(Mockito.anyString(), 
Mockito.anyString(), Mockito.anyString()))
+                .thenReturn(cosClient);
+    }
+
+    @AfterClass
+    public static void teardown() throws Exception {
+        helper.teardownAgentHome();
+    }
+
+    private void mockDay(COSClient cosClient) {
+        ObjectListing objectListing1_1 = Mockito.mock(ObjectListing.class);
+        when(objectListing1_1.getCommonPrefixes()).thenReturn(
+                Arrays.asList("some/20230928_0/", "some/20230928_1/", 
"some/20230928_aaa/"));
+        
when(objectListing1_1.getObjectSummaries()).thenReturn(getSummaries(Arrays.asList("some/20230928_test_0.txt")));
+
+        ObjectListing objectListing1_2 = Mockito.mock(ObjectListing.class);
+        when(objectListing1_2.getCommonPrefixes()).thenReturn(
+                Arrays.asList("some/20230929_aaa/", "some/20230929_1/", 
"some/20230929_2/"));
+        when(objectListing1_2.getObjectSummaries()).thenReturn(
+                getSummaries(Arrays.asList("some/20230929_0_test_0.txt")));
+
+        ObjectListing objectListing2_1 = Mockito.mock(ObjectListing.class);
+        when(objectListing2_1.getCommonPrefixes()).thenReturn(
+                Arrays.asList("some/20230928_0/where/", 
"some/20230928_0/test_1/"));
+        when(objectListing2_1.getObjectSummaries()).thenReturn(getSummaries(
+                Arrays.asList("some/20230928_0/test_0.txt", 
"some/20230928_0/test_1.txt",
+                        "some/20230928_0/test_o.txt")));
+
+        ObjectListing objectListing2_2 = Mockito.mock(ObjectListing.class);
+        when(objectListing2_2.getCommonPrefixes()).thenReturn(
+                Arrays.asList("some/20230929_1/where/", 
"some/20230929_1/test_1/"));
+        when(objectListing2_2.getObjectSummaries()).thenReturn(getSummaries(
+                Arrays.asList("some/20230929_1/test_0.txt", 
"some/20230929_1/test_1.txt",
+                        "some/20230929_1/test_o.txt")));
+
+        
when(cosClient.listObjects(Mockito.any(ListObjectsRequest.class))).thenAnswer(mock
 -> {
+            ListObjectsRequest req = mock.getArgument(0);
+            if (req.getPrefix().equals("some/20230928_")) {
+                return objectListing1_1;
+            } else if (req.getPrefix().equals("some/20230929_")) {
+                return objectListing1_2;
+            } else if (req.getPrefix().equals("some/20230928_0/")) {
+                return objectListing2_1;
+            } else if (req.getPrefix().equals("some/20230929_1/")) {
+                return objectListing2_2;
+            } else {
+                return new ObjectListing();
+            }
+        });
+    }
+
+    private void mockHour(COSClient cosClient) {
+        ObjectListing objectListing1_1 = Mockito.mock(ObjectListing.class);
+        when(objectListing1_1.getCommonPrefixes()).thenReturn(
+                Arrays.asList("some/2023092800_0/", "some/2023092800_1/", 
"some/2023092800_aaa/"));
+        when(objectListing1_1.getObjectSummaries()).thenReturn(
+                getSummaries(Arrays.asList("some/2023092800_test_0.txt")));
+
+        ObjectListing objectListing1_2 = Mockito.mock(ObjectListing.class);
+        when(objectListing1_2.getCommonPrefixes()).thenReturn(
+                Arrays.asList("some/2023092901_aaa/", "some/2023092901_1/", 
"some/2023092901_2/"));
+        when(objectListing1_2.getObjectSummaries()).thenReturn(
+                getSummaries(Arrays.asList("some/2023092901_0_test_0.txt")));
+
+        ObjectListing objectListing2_1 = Mockito.mock(ObjectListing.class);
+        when(objectListing2_1.getCommonPrefixes()).thenReturn(
+                Arrays.asList("some/2023092800_0/where/", 
"some/2023092800_0/test_1/"));
+        when(objectListing2_1.getObjectSummaries()).thenReturn(getSummaries(
+                Arrays.asList("some/2023092800_0/test_0.txt", 
"some/2023092800_0/test_1.txt",
+                        "some/2023092800_0/test_o.txt")));
+
+        ObjectListing objectListing2_2 = Mockito.mock(ObjectListing.class);
+        when(objectListing2_2.getCommonPrefixes()).thenReturn(
+                Arrays.asList("some/2023092901_1/where/", 
"some/2023092901_1/test_1/"));
+        when(objectListing2_2.getObjectSummaries()).thenReturn(getSummaries(
+                Arrays.asList("some/2023092901_1/test_0.txt", 
"some/2023092901_1/test_1.txt",
+                        "some/2023092901_1/test_o.txt")));
+
+        
when(cosClient.listObjects(Mockito.any(ListObjectsRequest.class))).thenAnswer(mock
 -> {
+            ListObjectsRequest req = mock.getArgument(0);
+            if (req.getPrefix().equals("some/2023092800_")) {
+                return objectListing1_1;
+            } else if (req.getPrefix().equals("some/2023092901_")) {
+                return objectListing1_2;
+            } else if (req.getPrefix().equals("some/2023092800_0/")) {
+                return objectListing2_1;
+            } else if (req.getPrefix().equals("some/2023092901_1/")) {
+                return objectListing2_2;
+            } else {
+                return new ObjectListing();
+            }
+        });
+    }
+
+    private List<COSObjectSummary> getSummaries(List<String> keys) {
+        List<COSObjectSummary> summaries = new ArrayList<>();
+        for (int i = 0; i < keys.size(); i++) {
+            COSObjectSummary summary = new COSObjectSummary();
+            summary.setKey(keys.get(i));
+            summary.setSize(100);
+            summary.setStorageClass("what");
+            summaries.add(summary);
+        }
+        return summaries;
+    }
+
+    @Test
+    public void testScan() {
+        mockDay(cosClient);
+        doTest(1, "some/YYYYMMDD_[0-9]+/test_[0-9]+.txt", CycleUnitType.DAY,
+                Arrays.asList("some/20230928_0/test_0.txt", 
"some/20230928_0/test_1.txt", "some/20230929_1/test_0.txt",
+                        "some/20230929_1/test_1.txt"),
+                Arrays.asList("20230928", "20230928", "20230929", "20230929"),
+                "20230928",
+                "20230930");
+        mockHour(cosClient);
+        doTest(2, "some/YYYYMMDDhh_[0-9]+/test_[0-9]+.txt", CycleUnitType.HOUR,
+                Arrays.asList("some/2023092800_0/test_0.txt", 
"some/2023092800_0/test_1.txt",
+                        "some/2023092901_1/test_0.txt",
+                        "some/2023092901_1/test_1.txt"),
+                Arrays.asList("2023092800", "2023092800", "2023092901", 
"2023092901"), "2023092800",
+                "2023093023");
+    }
+
+    private void doTest(int taskId, String pattern, String cycle, List<String> 
srcKeys, List<String> srcDataTimes,
+            String startTime, String endTime) {
+        TaskProfile taskProfile = helper.getCOSTaskProfile(taskId, pattern, 
"csv", true, startTime, endTime,
+                TaskStateEnum.RUNNING,
+                cycle, "GMT+8:00", null);
+        COSTask task = null;
+        final List<String> fileName = new ArrayList();
+        final List<String> dataTime = new ArrayList();
+        try {
+            task = PowerMockito.spy(new COSTask());
+            PowerMockito.doAnswer(invocation -> {
+                fileName.add(invocation.getArgument(0));
+                dataTime.add(invocation.getArgument(1));
+                return null;
+            }).when(task, "addToEvenMap", Mockito.anyString(), 
Mockito.anyString());
+            Assert.assertTrue(task.isProfileValid(taskProfile));
+            manager.getTaskStore().storeTask(taskProfile);
+            task.init(manager, taskProfile, manager.getInstanceBasicStore());
+            EXECUTOR_SERVICE.submit(task);
+        } catch (Exception e) {
+            LOGGER.error("source init error", e);
+            Assert.assertTrue("source init error", false);
+        }
+        await().atMost(10, TimeUnit.SECONDS)
+                .until(() -> fileName.size() == srcDataTimes.size() && 
dataTime.size() == srcDataTimes.size());
+        for (int i = 0; i < fileName.size(); i++) {
+            Assert.assertEquals(0, fileName.get(i).compareTo(srcKeys.get(i)));
+            Assert.assertEquals(0, 
dataTime.get(i).compareTo(srcDataTimes.get(i)));
+        }
+        task.destroy();
+    }
+}
\ No newline at end of file
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
index 440c4a5208..7d1962c314 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
@@ -20,7 +20,6 @@ package org.apache.inlong.agent.plugin.task;
 import org.apache.inlong.agent.common.AgentThreadFactory;
 import org.apache.inlong.agent.conf.TaskProfile;
 import org.apache.inlong.agent.constant.CycleUnitType;
-import org.apache.inlong.agent.constant.TaskConstants;
 import org.apache.inlong.agent.core.task.TaskManager;
 import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
 import org.apache.inlong.agent.plugin.task.file.LogFileTask;
@@ -101,14 +100,13 @@ public class TestLogFileTask {
         for (int i = 0; i < resources.size(); i++) {
             resourceName.add(LOADER.getResource(resources.get(i)).getPath());
         }
-        TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern, 
"csv", true, "", "", TaskStateEnum.RUNNING,
-                cycle, "GMT+8:00", null);
+        TaskProfile taskProfile =
+                helper.getFileTaskProfile(taskId, pattern, "csv", true, 
startTime, endTime, TaskStateEnum.RUNNING,
+                        cycle, "GMT+8:00", null);
         LogFileTask dayTask = null;
         final List<String> fileName = new ArrayList();
         final List<String> dataTime = new ArrayList();
         try {
-            taskProfile.set(TaskConstants.FILE_TASK_TIME_FROM, startTime);
-            taskProfile.set(TaskConstants.FILE_TASK_TIME_TO, endTime);
             dayTask = PowerMockito.spy(new LogFileTask());
             PowerMockito.doAnswer(invocation -> {
                 fileName.add(invocation.getArgument(0));
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java
index 014c5ce4e2..4cab9fa8f0 100755
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java
@@ -58,8 +58,9 @@ public class TestTaskManager {
             manager = new TaskManager();
             TaskStore taskStore = manager.getTaskStore();
             for (int i = 1; i <= 10; i++) {
-                TaskProfile taskProfile = helper.getTaskProfile(i, pattern, 
"csv", false, "", "", TaskStateEnum.RUNNING,
-                        "D", "GMT+8:00", null);
+                TaskProfile taskProfile =
+                        helper.getFileTaskProfile(i, pattern, "csv", false, 
"", "", TaskStateEnum.RUNNING,
+                                "D", "GMT+8:00", null);
                 taskProfile.setTaskClass(MockTask.class.getCanonicalName());
                 taskStore.storeTask(taskProfile);
             }
@@ -74,7 +75,7 @@ public class TestTaskManager {
             Assert.assertTrue("manager start error", false);
         }
 
-        TaskProfile taskProfile1 = helper.getTaskProfile(100, pattern, "csv", 
false, "", "", TaskStateEnum.RUNNING,
+        TaskProfile taskProfile1 = helper.getFileTaskProfile(100, pattern, 
"csv", false, "", "", TaskStateEnum.RUNNING,
                 "D", "GMT+8:00", null);
         String taskId1 = taskProfile1.getTaskId();
         taskProfile1.setTaskClass(MockTask.class.getCanonicalName());
@@ -99,7 +100,7 @@ public class TestTaskManager {
         Assert.assertTrue(manager.getTaskProfile(taskId1).getState() == 
TaskStateEnum.RUNNING);
 
         // test delete
-        TaskProfile taskProfile2 = helper.getTaskProfile(200, pattern, "csv", 
false, "", "", TaskStateEnum.RUNNING,
+        TaskProfile taskProfile2 = helper.getFileTaskProfile(200, pattern, 
"csv", false, "", "", TaskStateEnum.RUNNING,
                 "D", "GMT+8:00", null);
         taskProfile2.setTaskClass(MockTask.class.getCanonicalName());
         List<TaskProfile> taskProfiles2 = new ArrayList<>();


Reply via email to