This is an automated email from the ASF dual-hosted git repository.

wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git

The following commit(s) were added to refs/heads/master by this push:
     new 1f8a7fffb3 [INLONG-11569][Agent] Add COS Task (#11570)
1f8a7fffb3 is described below

commit 1f8a7fffb35bbfb53d1ada9ee4a1ea7005e1d1cd
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Mon Dec 2 17:00:23 2024 +0800

    [INLONG-11569][Agent] Add COS Task (#11570)
    
    * [INLONG-11569][Agent] Add COS Task
    
    * [INLONG-11569][Agent] Modify code based on comments
---
 .../apache/inlong/agent/conf/InstanceProfile.java  |  4 +-
 .../org/apache/inlong/agent/conf/TaskProfile.java  |  8 +--
 .../inlong/agent/constant/TaskConstants.java       | 24 +++++--
 .../java/org/apache/inlong/agent/pojo/COSTask.java | 73 ++++++++++++++++++++++
 .../apache/inlong/agent/pojo/TaskProfileDto.java   | 54 +++++++++++++++-
 .../inlong/agent/plugin/sources/LogFileSource.java |  9 ++-
 .../inlong/agent/plugin/task/file/LogFileTask.java | 18 +++---
 .../apache/inlong/common/enums/TaskTypeEnum.java   |  6 +-
 8 files changed, 165 insertions(+), 31 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
index c9a3d6a022..9e85872ff5 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
@@ -36,10 +36,10 @@ import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INL
 import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
 import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
 import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
-import static org.apache.inlong.agent.constant.TaskConstants.FILE_TASK_RETRY;
 import static org.apache.inlong.agent.constant.TaskConstants.INSTANCE_STATE;
 import static org.apache.inlong.agent.constant.TaskConstants.TASK_MQ_CLUSTERS;
 import static org.apache.inlong.agent.constant.TaskConstants.TASK_MQ_TOPIC;
+import static org.apache.inlong.agent.constant.TaskConstants.TASK_RETRY;
 
 /**
  * job profile which contains details describing properties of one job.
@@ -200,6 +200,6 @@ public class InstanceProfile extends AbstractConfiguration 
implements Comparable
     }
 
     public boolean isRetry() {
-        return getBoolean(FILE_TASK_RETRY, false);
+        return getBoolean(TASK_RETRY, false);
     }
 }
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
index 32450735e4..8c509e4240 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
@@ -36,7 +36,7 @@ import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INL
 import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
 import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
 import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
-import static org.apache.inlong.agent.constant.TaskConstants.FILE_TASK_RETRY;
+import static org.apache.inlong.agent.constant.TaskConstants.TASK_RETRY;
 import static org.apache.inlong.agent.constant.TaskConstants.TASK_STATE;
 
 /**
@@ -65,10 +65,6 @@ public class TaskProfile extends AbstractConfiguration {
         return get(TaskConstants.TASK_CYCLE_UNIT);
     }
 
-    public String getTimeOffset() {
-        return get(TaskConstants.TASK_FILE_TIME_OFFSET, "");
-    }
-
     public String getTimeZone() {
         return get(TaskConstants.TASK_TIME_ZONE);
     }
@@ -82,7 +78,7 @@ public class TaskProfile extends AbstractConfiguration {
     }
 
     public boolean isRetry() {
-        return getBoolean(FILE_TASK_RETRY, false);
+        return getBoolean(TASK_RETRY, false);
     }
 
     public String getTaskClass() {
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
index 22fb87e6e5..a4f9156577 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
@@ -59,10 +59,10 @@ public class TaskConstants extends CommonConstants {
     public static final String TASK_CYCLE_UNIT = "task.cycleUnit";
     public static final String FILE_TASK_CYCLE_UNIT = 
"task.fileTask.cycleUnit";
     public static final String TASK_FILE_CONTENT_COLLECT_TYPE = 
"task.fileTask.contentCollectType";
-    public static final String SOURCE_DATA_CONTENT_STYLE = 
"task.fileTask.dataContentStyle";
-    public static final String SOURCE_DATA_SEPARATOR = 
"task.fileTask.dataSeparator";
-    public static final String SOURCE_FILTER_STREAMS = 
"task.fileTask.filterStreams";
-    public static final String FILE_TASK_RETRY = "task.fileTask.retry";
+    public static final String FILE_CONTENT_STYLE = 
"task.fileTask.dataContentStyle";
+    public static final String FILE_DATA_SEPARATOR = 
"task.fileTask.dataSeparator";
+    public static final String FILE_FILTER_STREAMS = 
"task.fileTask.filterStreams";
+    public static final String TASK_RETRY = "task.retry";
     public static final String FILE_TASK_TIME_FROM = 
"task.fileTask.dataTimeFrom";
     public static final String FILE_TASK_TIME_TO = "task.fileTask.dataTimeTo";
     public static final String FILE_MAX_NUM = "task.fileTask.maxFileCount";
@@ -75,6 +75,22 @@ public class TaskConstants extends CommonConstants {
     public static final String TASK_KAFKA_OFFSET = 
"task.kafkaTask.partition.offset";
     public static final String TASK_KAFKA_AUTO_COMMIT_OFFSET_RESET = 
"task.kafkaTask.autoOffsetReset";
 
+    // COS task
+    public static final String COS_TASK_CYCLE_UNIT = "task.cosTask.cycleUnit";
+    public static final String COS_CONTENT_STYLE = "task.cosTask.contentStyle";
+    public static final String COS_MAX_NUM = "task.cosTask.maxFileCount";
+    public static final String COS_TASK_PATTERN = "task.cosTask.pattern";
+    public static final String TASK_COS_TIME_OFFSET = 
"task.cosTask.timeOffset";
+    public static final String COS_TASK_RETRY = "task.cosTask.retry";
+    public static final String COS_TASK_TIME_FROM = 
"task.cosTask.dataTimeFrom";
+    public static final String COS_TASK_TIME_TO = "task.cosTask.dataTimeTo";
+    public static final String COS_TASK_BUCKET_NAME = 
"task.cosTask.bucketName";
+    public static final String COS_TASK_SECRET_ID = "task.cosTask.secretId";
+    public static final String COS_TASK_SECRET_KEY = "task.cosTask.secretKey";
+    public static final String COS_TASK_REGION = "task.cosTask.region";
+    public static final String COS_DATA_SEPARATOR = 
"task.cosTask.dataSeparator";
+    public static final String COS_FILTER_STREAMS = 
"task.cosTask.filterStreams";
+
     /**
      * delimiter to split offset for different task
      */
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/COSTask.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/COSTask.java
new file mode 100644
index 0000000000..83d43c6af3
--- /dev/null
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/COSTask.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.pojo;
+
+import lombok.Data;
+
+import java.util.List;
+
+@Data
+public class COSTask {
+
+    private Integer id;
+    private String pattern;
+    private String cycleUnit;
+    private Boolean retry;
+    private String dataTimeFrom;
+    private String dataTimeTo;
+    private String timeOffset;
+    private Integer maxFileCount;
+    private String collectType;
+    private String contentStyle;
+    private String dataSeparator;
+    private String filterStreams;
+    private String bucketName;
+    private String secretId;
+    private String secretKey;
+    private String region;
+
+    @Data
+    public static class COSTaskConfig {
+
+        private String pattern;
+        private String cycleUnit;
+        private Boolean retry;
+        private String dataTimeFrom;
+        private String dataTimeTo;
+        // '1m' means one minute after, '-1m' means one minute before
+        // '1h' means one hour after, '-1h' means one hour before
+        // '1d' means one day after, '-1d' means one day before
+        // Null means from current timestamp
+        private String timeOffset;
+        private Integer maxFileCount;
+        // Collect type, for example: FULL, INCREMENT
+        private String collectType;
+        // Type of data result for column separator
+        // CSV format, set this parameter to a custom separator: , | :
+        // Json format, set this parameter to json
+        private String contentStyle;
+        // Column separator of data source
+        private String dataSeparator;
+        // The streamIds to be filtered out
+        private List<String> filterStreams;
+        private String bucketName;
+        private String credentialsId;
+        private String credentialsKey;
+        private String region;
+    }
+}
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
index 2d4a6a32ae..a9134de3a8 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
@@ -21,6 +21,7 @@ import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.conf.TaskProfile;
 import org.apache.inlong.agent.constant.CycleUnitType;
 import org.apache.inlong.agent.pojo.BinlogTask.BinlogTaskConfig;
+import org.apache.inlong.agent.pojo.COSTask.COSTaskConfig;
 import org.apache.inlong.agent.pojo.FileTask.FileTaskConfig;
 import org.apache.inlong.agent.pojo.FileTask.Line;
 import org.apache.inlong.agent.pojo.KafkaTask.KafkaTaskConfig;
@@ -37,6 +38,8 @@ import org.apache.inlong.common.pojo.agent.DataConfig;
 
 import com.google.gson.Gson;
 import lombok.Data;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.stream.Collectors;
 
@@ -48,6 +51,8 @@ import static 
org.apache.inlong.common.enums.DataReportTypeEnum.NORMAL_SEND_TO_D
 @Data
 public class TaskProfileDto {
 
+    private static final Logger logger = 
LoggerFactory.getLogger(TaskProfileDto.class);
+
     public static final String DEFAULT_FILE_TASK = 
"org.apache.inlong.agent.plugin.task.file.LogFileTask";
     public static final String DEFAULT_KAFKA_TASK = 
"org.apache.inlong.agent.plugin.task.KafkaTask";
     public static final String DEFAULT_PULSAR_TASK = 
"org.apache.inlong.agent.plugin.task.PulsarTask";
@@ -62,7 +67,7 @@ public class TaskProfileDto {
     public static final String DEFAULT_DATA_PROXY_SINK = 
"org.apache.inlong.agent.plugin.sinks.ProxySink";
     public static final String PULSAR_SINK = 
"org.apache.inlong.agent.plugin.sinks.PulsarSink";
     public static final String KAFKA_SINK = 
"org.apache.inlong.agent.plugin.sinks.KafkaSink";
-
+    public static final String DEFAULT_COS_TASK = 
"org.apache.inlong.agent.plugin.task.cos.COSTask";
     /**
      * file source
      */
@@ -101,6 +106,10 @@ public class TaskProfileDto {
      * sqlserver source
      */
     public static final String SQLSERVER_SOURCE = 
"org.apache.inlong.agent.plugin.sources.SQLServerSource";
+    /**
+     * cos source
+     */
+    public static final String COS_SOURCE = 
"org.apache.inlong.agent.plugin.sources.COSSource";
 
     private static final Gson GSON = new Gson();
 
@@ -197,6 +206,35 @@ public class TaskProfileDto {
         return fileTask;
     }
 
+    private static COSTask getCOSTask(DataConfig dataConfig) {
+        COSTask cosTask = new COSTask();
+        cosTask.setId(dataConfig.getTaskId());
+        COSTaskConfig taskConfig = GSON.fromJson(dataConfig.getExtParams(),
+                COSTaskConfig.class);
+        cosTask.setPattern(taskConfig.getPattern());
+        cosTask.setCollectType(taskConfig.getCollectType());
+        cosTask.setContentStyle(taskConfig.getContentStyle());
+        cosTask.setDataSeparator(taskConfig.getDataSeparator());
+        cosTask.setMaxFileCount(taskConfig.getMaxFileCount());
+        cosTask.setRetry(taskConfig.getRetry());
+        cosTask.setCycleUnit(taskConfig.getCycleUnit());
+        cosTask.setDataTimeFrom(taskConfig.getDataTimeFrom());
+        cosTask.setDataTimeTo(taskConfig.getDataTimeTo());
+        cosTask.setBucketName(taskConfig.getBucketName());
+        cosTask.setSecretId(taskConfig.getCredentialsId());
+        cosTask.setSecretKey(taskConfig.getCredentialsKey());
+        cosTask.setRegion(taskConfig.getRegion());
+        if (taskConfig.getFilterStreams() != null) {
+            
cosTask.setFilterStreams(GSON.toJson(taskConfig.getFilterStreams()));
+        }
+        if (taskConfig.getTimeOffset() != null) {
+            cosTask.setTimeOffset(taskConfig.getTimeOffset());
+        } else {
+            cosTask.setTimeOffset(deafult_time_offset + 
cosTask.getCycleUnit());
+        }
+        return cosTask;
+    }
+
     private static KafkaTask getKafkaTask(DataConfig dataConfigs) {
 
         KafkaTaskConfig kafkaTaskConfig = 
GSON.fromJson(dataConfigs.getExtParams(),
@@ -468,6 +506,7 @@ public class TaskProfileDto {
                 throw new IllegalArgumentException("invalid mq type " + mqType 
+ " please check");
             }
         }
+        task.setRetry(false);
         TaskTypeEnum taskType = 
TaskTypeEnum.getTaskType(dataConfig.getTaskType());
         switch (requireNonNull(taskType)) {
             case SQL:
@@ -483,6 +522,7 @@ public class TaskProfileDto {
                 task.setCycleUnit(fileTask.getCycleUnit());
                 task.setFileTask(fileTask);
                 task.setSource(DEFAULT_SOURCE);
+                task.setRetry(fileTask.getRetry());
                 profileDto.setTask(task);
                 break;
             case KAFKA:
@@ -544,7 +584,17 @@ public class TaskProfileDto {
             case MOCK:
                 profileDto.setTask(task);
                 break;
+            case COS:
+                task.setTaskClass(DEFAULT_COS_TASK);
+                COSTask cosTask = getCOSTask(dataConfig);
+                task.setCycleUnit(cosTask.getCycleUnit());
+                task.setCosTask(cosTask);
+                task.setSource(COS_SOURCE);
+                task.setRetry(cosTask.getRetry());
+                profileDto.setTask(task);
+                break;
             default:
+                logger.error("invalid task type {}", taskType);
         }
         return TaskProfile.parseJsonStr(GSON.toJson(profileDto));
     }
@@ -574,6 +624,7 @@ public class TaskProfileDto {
         private String cycleUnit;
         private String timeZone;
         private String auditVersion;
+        private boolean retry;
 
         private FileTask fileTask;
         private BinlogTask binlogTask;
@@ -585,6 +636,7 @@ public class TaskProfileDto {
         private RedisTask redisTask;
         private MqttTask mqttTask;
         private SqlServerTask sqlserverTask;
+        private COSTask cosTask;
     }
 
     @Data
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index bd53b4530f..300c16168f 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -48,7 +48,7 @@ import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.List;
 
-import static 
org.apache.inlong.agent.constant.TaskConstants.SOURCE_DATA_CONTENT_STYLE;
+import static 
org.apache.inlong.agent.constant.TaskConstants.FILE_CONTENT_STYLE;
 
 /**
  * Read text files
@@ -355,7 +355,7 @@ public class LogFileSource extends AbstractSource {
                 FileStatic data = new FileStatic();
                 data.setTaskId(taskId);
                 data.setRetry(String.valueOf(profile.isRetry()));
-                data.setContentType(profile.get(SOURCE_DATA_CONTENT_STYLE));
+                data.setContentType(profile.get(FILE_CONTENT_STYLE));
                 data.setGroupId(profile.getInlongGroupId());
                 data.setStreamId(profile.getInlongStreamId());
                 data.setDataTime(format.format(profile.getSinkDataTime()));
@@ -364,10 +364,9 @@ public class LogFileSource extends AbstractSource {
                 data.setReadBytes(String.valueOf(bytePosition));
                 data.setReadLines(String.valueOf(linePosition));
                 OffsetProfile offsetProfile = 
OffsetManager.getInstance().getOffset(taskId, instanceId);
-                if (offsetProfile == null) {
-                    return;
+                if (offsetProfile != null) {
+                    data.setSendLines(offsetProfile.getOffset());
                 }
-                data.setSendLines(offsetProfile.getOffset());
                 FileStaticManager.putStaticMsg(data);
                 randomAccessFile.close();
             } catch (IOException e) {
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
index 0c104956d7..4eb3b69525 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
@@ -87,6 +87,7 @@ public class LogFileTask extends AbstractTask {
     public final long SCAN_INTERVAL = 1 * 60 * 1000;
     private volatile boolean runAtLeastOneTime = false;
     private volatile long coreThreadUpdateTime = 0;
+    private String timeOffset = "";
     private BlockingQueue<InstanceProfile> instanceQueue;
 
     @Override
@@ -96,8 +97,9 @@ public class LogFileTask extends AbstractTask {
 
     @Override
     protected void initTask() {
+        timeOffset = taskProfile.get(TaskConstants.TASK_FILE_TIME_OFFSET, "");
         instanceQueue = new LinkedBlockingQueue<>(INSTANCE_QUEUE_CAPACITY);
-        retry = taskProfile.getBoolean(TaskConstants.FILE_TASK_RETRY, false);
+        retry = taskProfile.isRetry();
         originPatterns = 
Stream.of(taskProfile.get(TaskConstants.FILE_DIR_FILTER_PATTERNS).split(","))
                 .collect(Collectors.toSet());
         if 
(taskProfile.getCycleUnit().compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) {
@@ -134,7 +136,6 @@ public class LogFileTask extends AbstractTask {
         while (list.size() < INSTANCE_QUEUE_CAPACITY && 
!instanceQueue.isEmpty()) {
             InstanceProfile profile = instanceQueue.poll();
             if (profile != null) {
-                LOGGER.info("test123 2 taskid {} {}", getTaskId(), 
profile.getInstanceId());
                 list.add(profile);
             }
         }
@@ -175,7 +176,7 @@ public class LogFileTask extends AbstractTask {
             LOGGER.error("task profile needs time offset");
             return false;
         }
-        if (profile.getBoolean(TaskConstants.FILE_TASK_RETRY, false)) {
+        if (profile.isRetry()) {
             if (!initRetryTask(profile)) {
                 return false;
             }
@@ -292,11 +293,11 @@ public class LogFileTask extends AbstractTask {
 
     private List<BasicFileInfo> scanExistingFileByPattern(String 
originPattern) {
         if (realTime) {
-            return FileScanner.scanTaskBetweenTimes(originPattern, 
CycleUnitType.HOUR, taskProfile.getTimeOffset(),
+            return FileScanner.scanTaskBetweenTimes(originPattern, 
CycleUnitType.HOUR, timeOffset,
                     startTime, endTime, retry);
         } else {
             return FileScanner.scanTaskBetweenTimes(originPattern, 
taskProfile.getCycleUnit(),
-                    taskProfile.getTimeOffset(), startTime, endTime, retry);
+                    timeOffset, startTime, endTime, retry);
         }
     }
 
@@ -328,7 +329,7 @@ public class LogFileTask extends AbstractTask {
         long startScanTime = startTime;
         long endScanTime = endTime;
         List<String> dataTimeList = Scanner.getDataTimeList(startScanTime, 
endScanTime, taskProfile.getCycleUnit(),
-                taskProfile.getTimeOffset(), retry);
+                timeOffset, retry);
         if (dataTimeList.isEmpty()) {
             LOGGER.error("getDataTimeList get empty list");
             return;
@@ -390,7 +391,7 @@ public class LogFileTask extends AbstractTask {
      */
     private boolean shouldStartNow(String dataTime) {
         String shouldStartTime =
-                NewDateUtils.getShouldStartTime(dataTime, 
taskProfile.getCycleUnit(), taskProfile.getTimeOffset());
+                NewDateUtils.getShouldStartTime(dataTime, 
taskProfile.getCycleUnit(), timeOffset);
         String currentTime = getCurrentTime();
         return currentTime.compareTo(shouldStartTime) >= 0;
     }
@@ -531,8 +532,7 @@ public class LogFileTask extends AbstractTask {
         if (dateExpression.getLongestDatePattern().length() != 0) {
             String dataTime = getDataTimeFromFileName(newFileName, 
entity.getOriginPattern(), dateExpression);
             LOGGER.info("file {}, fileTime {}", newFileName, dataTime);
-            if (!NewDateUtils.isValidCreationTime(dataTime, 
entity.getCycleUnit(),
-                    taskProfile.getTimeOffset())) {
+            if (!NewDateUtils.isValidCreationTime(dataTime, 
entity.getCycleUnit(), timeOffset)) {
                 return false;
             }
         }
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java 
b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
index c84ea142db..3a032e6511 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
@@ -44,11 +44,9 @@ public enum TaskTypeEnum {
     REDIS(11),
     MQTT(12),
     HUDI(13),
-
+    COS(14),
     // only used for unit test
-    MOCK(201)
-
-    ;
+    MOCK(201);
 
     private static final Map<Integer, TaskTypeEnum> TASK_TYPE_ENUM_MAP = 
Maps.newHashMap();
 

Reply via email to