This is an automated email from the ASF dual-hosted git repository. wenweihuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 1f8a7fffb3 [INLONG-11569][Agent] Add COS Task (#11570) 1f8a7fffb3 is described below commit 1f8a7fffb35bbfb53d1ada9ee4a1ea7005e1d1cd Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Mon Dec 2 17:00:23 2024 +0800 [INLONG-11569][Agent] Add COS Task (#11570) * [INLONG-11569][Agent] Add COS Task * [INLONG-11569][Agent] Modify code based on comments --- .../apache/inlong/agent/conf/InstanceProfile.java | 4 +- .../org/apache/inlong/agent/conf/TaskProfile.java | 8 +-- .../inlong/agent/constant/TaskConstants.java | 24 +++++-- .../java/org/apache/inlong/agent/pojo/COSTask.java | 73 ++++++++++++++++++++++ .../apache/inlong/agent/pojo/TaskProfileDto.java | 54 +++++++++++++++- .../inlong/agent/plugin/sources/LogFileSource.java | 9 ++- .../inlong/agent/plugin/task/file/LogFileTask.java | 18 +++--- .../apache/inlong/common/enums/TaskTypeEnum.java | 6 +- 8 files changed, 165 insertions(+), 31 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java index c9a3d6a022..9e85872ff5 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java @@ -36,10 +36,10 @@ import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INL import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID; import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID; import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID; -import static org.apache.inlong.agent.constant.TaskConstants.FILE_TASK_RETRY; import static org.apache.inlong.agent.constant.TaskConstants.INSTANCE_STATE; import static org.apache.inlong.agent.constant.TaskConstants.TASK_MQ_CLUSTERS; import static org.apache.inlong.agent.constant.TaskConstants.TASK_MQ_TOPIC; +import static org.apache.inlong.agent.constant.TaskConstants.TASK_RETRY; /** * job profile which contains details describing properties of one job. @@ -200,6 +200,6 @@ public class InstanceProfile extends AbstractConfiguration implements Comparable } public boolean isRetry() { - return getBoolean(FILE_TASK_RETRY, false); + return getBoolean(TASK_RETRY, false); } } diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java index 32450735e4..8c509e4240 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java @@ -36,7 +36,7 @@ import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INL import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID; import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID; import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID; -import static org.apache.inlong.agent.constant.TaskConstants.FILE_TASK_RETRY; +import static org.apache.inlong.agent.constant.TaskConstants.TASK_RETRY; import static org.apache.inlong.agent.constant.TaskConstants.TASK_STATE; /** @@ -65,10 +65,6 @@ public class TaskProfile extends AbstractConfiguration { return get(TaskConstants.TASK_CYCLE_UNIT); } - public String getTimeOffset() { - return get(TaskConstants.TASK_FILE_TIME_OFFSET, ""); - } - public String getTimeZone() { return get(TaskConstants.TASK_TIME_ZONE); } @@ -82,7 +78,7 @@ public class TaskProfile extends AbstractConfiguration { } public boolean isRetry() { - return getBoolean(FILE_TASK_RETRY, false); + return getBoolean(TASK_RETRY, false); } public String getTaskClass() { diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java index 22fb87e6e5..a4f9156577 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java @@ -59,10 +59,10 @@ public class TaskConstants extends CommonConstants { public static final String TASK_CYCLE_UNIT = "task.cycleUnit"; public static final String FILE_TASK_CYCLE_UNIT = "task.fileTask.cycleUnit"; public static final String TASK_FILE_CONTENT_COLLECT_TYPE = "task.fileTask.contentCollectType"; - public static final String SOURCE_DATA_CONTENT_STYLE = "task.fileTask.dataContentStyle"; - public static final String SOURCE_DATA_SEPARATOR = "task.fileTask.dataSeparator"; - public static final String SOURCE_FILTER_STREAMS = "task.fileTask.filterStreams"; - public static final String FILE_TASK_RETRY = "task.fileTask.retry"; + public static final String FILE_CONTENT_STYLE = "task.fileTask.dataContentStyle"; + public static final String FILE_DATA_SEPARATOR = "task.fileTask.dataSeparator"; + public static final String FILE_FILTER_STREAMS = "task.fileTask.filterStreams"; + public static final String TASK_RETRY = "task.retry"; public static final String FILE_TASK_TIME_FROM = "task.fileTask.dataTimeFrom"; public static final String FILE_TASK_TIME_TO = "task.fileTask.dataTimeTo"; public static final String FILE_MAX_NUM = "task.fileTask.maxFileCount"; @@ -75,6 +75,22 @@ public class TaskConstants extends CommonConstants { public static final String TASK_KAFKA_OFFSET = "task.kafkaTask.partition.offset"; public static final String TASK_KAFKA_AUTO_COMMIT_OFFSET_RESET = "task.kafkaTask.autoOffsetReset"; + // COS task + public static final String COS_TASK_CYCLE_UNIT = "task.cosTask.cycleUnit"; + public static final String COS_CONTENT_STYLE = "task.cosTask.contentStyle"; + public static final String COS_MAX_NUM = "task.cosTask.maxFileCount"; + public static final String COS_TASK_PATTERN = "task.cosTask.pattern"; + public static final String TASK_COS_TIME_OFFSET = "task.cosTask.timeOffset"; + public static final String COS_TASK_RETRY = "task.cosTask.retry"; + public static final String COS_TASK_TIME_FROM = "task.cosTask.dataTimeFrom"; + public static final String COS_TASK_TIME_TO = "task.cosTask.dataTimeTo"; + public static final String COS_TASK_BUCKET_NAME = "task.cosTask.bucketName"; + public static final String COS_TASK_SECRET_ID = "task.cosTask.secretId"; + public static final String COS_TASK_SECRET_KEY = "task.cosTask.secretKey"; + public static final String COS_TASK_REGION = "task.cosTask.region"; + public static final String COS_DATA_SEPARATOR = "task.cosTask.dataSeparator"; + public static final String COS_FILTER_STREAMS = "task.cosTask.filterStreams"; + /** * delimiter to split offset for different task */ diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/COSTask.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/COSTask.java new file mode 100644 index 0000000000..83d43c6af3 --- /dev/null +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/COSTask.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.pojo; + +import lombok.Data; + +import java.util.List; + +@Data +public class COSTask { + + private Integer id; + private String pattern; + private String cycleUnit; + private Boolean retry; + private String dataTimeFrom; + private String dataTimeTo; + private String timeOffset; + private Integer maxFileCount; + private String collectType; + private String contentStyle; + private String dataSeparator; + private String filterStreams; + private String bucketName; + private String secretId; + private String secretKey; + private String region; + + @Data + public static class COSTaskConfig { + + private String pattern; + private String cycleUnit; + private Boolean retry; + private String dataTimeFrom; + private String dataTimeTo; + // '1m' means one minute after, '-1m' means one minute before + // '1h' means one hour after, '-1h' means one hour before + // '1d' means one day after, '-1d' means one day before + // Null means from current timestamp + private String timeOffset; + private Integer maxFileCount; + // Collect type, for example: FULL, INCREMENT + private String collectType; + // Type of data result for column separator + // CSV format, set this parameter to a custom separator: , | : + // Json format, set this parameter to json + private String contentStyle; + // Column separator of data source + private String dataSeparator; + // The streamIds to be filtered out + private List<String> filterStreams; + private String bucketName; + private String credentialsId; + private String credentialsKey; + private String region; + } +} diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java index 2d4a6a32ae..a9134de3a8 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java @@ -21,6 +21,7 @@ import org.apache.inlong.agent.conf.AgentConfiguration; import org.apache.inlong.agent.conf.TaskProfile; import org.apache.inlong.agent.constant.CycleUnitType; import org.apache.inlong.agent.pojo.BinlogTask.BinlogTaskConfig; +import org.apache.inlong.agent.pojo.COSTask.COSTaskConfig; import org.apache.inlong.agent.pojo.FileTask.FileTaskConfig; import org.apache.inlong.agent.pojo.FileTask.Line; import org.apache.inlong.agent.pojo.KafkaTask.KafkaTaskConfig; @@ -37,6 +38,8 @@ import org.apache.inlong.common.pojo.agent.DataConfig; import com.google.gson.Gson; import lombok.Data; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.stream.Collectors; @@ -48,6 +51,8 @@ import static org.apache.inlong.common.enums.DataReportTypeEnum.NORMAL_SEND_TO_D @Data public class TaskProfileDto { + private static final Logger logger = LoggerFactory.getLogger(TaskProfileDto.class); + public static final String DEFAULT_FILE_TASK = "org.apache.inlong.agent.plugin.task.file.LogFileTask"; public static final String DEFAULT_KAFKA_TASK = "org.apache.inlong.agent.plugin.task.KafkaTask"; public static final String DEFAULT_PULSAR_TASK = "org.apache.inlong.agent.plugin.task.PulsarTask"; @@ -62,7 +67,7 @@ public class TaskProfileDto { public static final String DEFAULT_DATA_PROXY_SINK = "org.apache.inlong.agent.plugin.sinks.ProxySink"; public static final String PULSAR_SINK = "org.apache.inlong.agent.plugin.sinks.PulsarSink"; public static final String KAFKA_SINK = "org.apache.inlong.agent.plugin.sinks.KafkaSink"; - + public static final String DEFAULT_COS_TASK = "org.apache.inlong.agent.plugin.task.cos.COSTask"; /** * file source */ @@ -101,6 +106,10 @@ public class TaskProfileDto { * sqlserver source */ public static final String SQLSERVER_SOURCE = "org.apache.inlong.agent.plugin.sources.SQLServerSource"; + /** + * cos source + */ + public static final String COS_SOURCE = "org.apache.inlong.agent.plugin.sources.COSSource"; private static final Gson GSON = new Gson(); @@ -197,6 +206,35 @@ public class TaskProfileDto { return fileTask; } + private static COSTask getCOSTask(DataConfig dataConfig) { + COSTask cosTask = new COSTask(); + cosTask.setId(dataConfig.getTaskId()); + COSTaskConfig taskConfig = GSON.fromJson(dataConfig.getExtParams(), + COSTaskConfig.class); + cosTask.setPattern(taskConfig.getPattern()); + cosTask.setCollectType(taskConfig.getCollectType()); + cosTask.setContentStyle(taskConfig.getContentStyle()); + cosTask.setDataSeparator(taskConfig.getDataSeparator()); + cosTask.setMaxFileCount(taskConfig.getMaxFileCount()); + cosTask.setRetry(taskConfig.getRetry()); + cosTask.setCycleUnit(taskConfig.getCycleUnit()); + cosTask.setDataTimeFrom(taskConfig.getDataTimeFrom()); + cosTask.setDataTimeTo(taskConfig.getDataTimeTo()); + cosTask.setBucketName(taskConfig.getBucketName()); + cosTask.setSecretId(taskConfig.getCredentialsId()); + cosTask.setSecretKey(taskConfig.getCredentialsKey()); + cosTask.setRegion(taskConfig.getRegion()); + if (taskConfig.getFilterStreams() != null) { + cosTask.setFilterStreams(GSON.toJson(taskConfig.getFilterStreams())); + } + if (taskConfig.getTimeOffset() != null) { + cosTask.setTimeOffset(taskConfig.getTimeOffset()); + } else { + cosTask.setTimeOffset(deafult_time_offset + cosTask.getCycleUnit()); + } + return cosTask; + } + private static KafkaTask getKafkaTask(DataConfig dataConfigs) { KafkaTaskConfig kafkaTaskConfig = GSON.fromJson(dataConfigs.getExtParams(), @@ -468,6 +506,7 @@ public class TaskProfileDto { throw new IllegalArgumentException("invalid mq type " + mqType + " please check"); } } + task.setRetry(false); TaskTypeEnum taskType = TaskTypeEnum.getTaskType(dataConfig.getTaskType()); switch (requireNonNull(taskType)) { case SQL: @@ -483,6 +522,7 @@ public class TaskProfileDto { task.setCycleUnit(fileTask.getCycleUnit()); task.setFileTask(fileTask); task.setSource(DEFAULT_SOURCE); + task.setRetry(fileTask.getRetry()); profileDto.setTask(task); break; case KAFKA: @@ -544,7 +584,17 @@ public class TaskProfileDto { case MOCK: profileDto.setTask(task); break; + case COS: + task.setTaskClass(DEFAULT_COS_TASK); + COSTask cosTask = getCOSTask(dataConfig); + task.setCycleUnit(cosTask.getCycleUnit()); + task.setCosTask(cosTask); + task.setSource(COS_SOURCE); + task.setRetry(cosTask.getRetry()); + profileDto.setTask(task); + break; default: + logger.error("invalid task type {}", taskType); } return TaskProfile.parseJsonStr(GSON.toJson(profileDto)); } @@ -574,6 +624,7 @@ public class TaskProfileDto { private String cycleUnit; private String timeZone; private String auditVersion; + private boolean retry; private FileTask fileTask; private BinlogTask binlogTask; @@ -585,6 +636,7 @@ public class TaskProfileDto { private RedisTask redisTask; private MqttTask mqttTask; private SqlServerTask sqlserverTask; + private COSTask cosTask; } @Data diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java index bd53b4530f..300c16168f 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java @@ -48,7 +48,7 @@ import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.List; -import static org.apache.inlong.agent.constant.TaskConstants.SOURCE_DATA_CONTENT_STYLE; +import static org.apache.inlong.agent.constant.TaskConstants.FILE_CONTENT_STYLE; /** * Read text files @@ -355,7 +355,7 @@ public class LogFileSource extends AbstractSource { FileStatic data = new FileStatic(); data.setTaskId(taskId); data.setRetry(String.valueOf(profile.isRetry())); - data.setContentType(profile.get(SOURCE_DATA_CONTENT_STYLE)); + data.setContentType(profile.get(FILE_CONTENT_STYLE)); data.setGroupId(profile.getInlongGroupId()); data.setStreamId(profile.getInlongStreamId()); data.setDataTime(format.format(profile.getSinkDataTime())); @@ -364,10 +364,9 @@ public class LogFileSource extends AbstractSource { data.setReadBytes(String.valueOf(bytePosition)); data.setReadLines(String.valueOf(linePosition)); OffsetProfile offsetProfile = OffsetManager.getInstance().getOffset(taskId, instanceId); - if (offsetProfile == null) { - return; + if (offsetProfile != null) { + data.setSendLines(offsetProfile.getOffset()); } - data.setSendLines(offsetProfile.getOffset()); FileStaticManager.putStaticMsg(data); randomAccessFile.close(); } catch (IOException e) { diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java index 0c104956d7..4eb3b69525 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java @@ -87,6 +87,7 @@ public class LogFileTask extends AbstractTask { public final long SCAN_INTERVAL = 1 * 60 * 1000; private volatile boolean runAtLeastOneTime = false; private volatile long coreThreadUpdateTime = 0; + private String timeOffset = ""; private BlockingQueue<InstanceProfile> instanceQueue; @Override @@ -96,8 +97,9 @@ public class LogFileTask extends AbstractTask { @Override protected void initTask() { + timeOffset = taskProfile.get(TaskConstants.TASK_FILE_TIME_OFFSET, ""); instanceQueue = new LinkedBlockingQueue<>(INSTANCE_QUEUE_CAPACITY); - retry = taskProfile.getBoolean(TaskConstants.FILE_TASK_RETRY, false); + retry = taskProfile.isRetry(); originPatterns = Stream.of(taskProfile.get(TaskConstants.FILE_DIR_FILTER_PATTERNS).split(",")) .collect(Collectors.toSet()); if (taskProfile.getCycleUnit().compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) { @@ -134,7 +136,6 @@ public class LogFileTask extends AbstractTask { while (list.size() < INSTANCE_QUEUE_CAPACITY && !instanceQueue.isEmpty()) { InstanceProfile profile = instanceQueue.poll(); if (profile != null) { - LOGGER.info("test123 2 taskid {} {}", getTaskId(), profile.getInstanceId()); list.add(profile); } } @@ -175,7 +176,7 @@ public class LogFileTask extends AbstractTask { LOGGER.error("task profile needs time offset"); return false; } - if (profile.getBoolean(TaskConstants.FILE_TASK_RETRY, false)) { + if (profile.isRetry()) { if (!initRetryTask(profile)) { return false; } @@ -292,11 +293,11 @@ public class LogFileTask extends AbstractTask { private List<BasicFileInfo> scanExistingFileByPattern(String originPattern) { if (realTime) { - return FileScanner.scanTaskBetweenTimes(originPattern, CycleUnitType.HOUR, taskProfile.getTimeOffset(), + return FileScanner.scanTaskBetweenTimes(originPattern, CycleUnitType.HOUR, timeOffset, startTime, endTime, retry); } else { return FileScanner.scanTaskBetweenTimes(originPattern, taskProfile.getCycleUnit(), - taskProfile.getTimeOffset(), startTime, endTime, retry); + timeOffset, startTime, endTime, retry); } } @@ -328,7 +329,7 @@ public class LogFileTask extends AbstractTask { long startScanTime = startTime; long endScanTime = endTime; List<String> dataTimeList = Scanner.getDataTimeList(startScanTime, endScanTime, taskProfile.getCycleUnit(), - taskProfile.getTimeOffset(), retry); + timeOffset, retry); if (dataTimeList.isEmpty()) { LOGGER.error("getDataTimeList get empty list"); return; @@ -390,7 +391,7 @@ public class LogFileTask extends AbstractTask { */ private boolean shouldStartNow(String dataTime) { String shouldStartTime = - NewDateUtils.getShouldStartTime(dataTime, taskProfile.getCycleUnit(), taskProfile.getTimeOffset()); + NewDateUtils.getShouldStartTime(dataTime, taskProfile.getCycleUnit(), timeOffset); String currentTime = getCurrentTime(); return currentTime.compareTo(shouldStartTime) >= 0; } @@ -531,8 +532,7 @@ public class LogFileTask extends AbstractTask { if (dateExpression.getLongestDatePattern().length() != 0) { String dataTime = getDataTimeFromFileName(newFileName, entity.getOriginPattern(), dateExpression); LOGGER.info("file {}, fileTime {}", newFileName, dataTime); - if (!NewDateUtils.isValidCreationTime(dataTime, entity.getCycleUnit(), - taskProfile.getTimeOffset())) { + if (!NewDateUtils.isValidCreationTime(dataTime, entity.getCycleUnit(), timeOffset)) { return false; } } diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java index c84ea142db..3a032e6511 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java @@ -44,11 +44,9 @@ public enum TaskTypeEnum { REDIS(11), MQTT(12), HUDI(13), - + COS(14), // only used for unit test - MOCK(201) - - ; + MOCK(201); private static final Map<Integer, TaskTypeEnum> TASK_TYPE_ENUM_MAP = Maps.newHashMap();