This is an automated email from the ASF dual-hosted git repository. wenweihuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 7ccbd97a6e [INLONG-11571][Agent] Add classes for actual collection of COS source (#11572) 7ccbd97a6e is described below commit 7ccbd97a6e20312d9ff0debdde18dd482b065442 Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Wed Dec 4 16:56:46 2024 +0800 [INLONG-11571][Agent] Add classes for actual collection of COS source (#11572) * [INLONG-11571][Agent] Add classes for actual collection of COS source * [INLONG-11571][Agent] Delete invalid comments * Update inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/COSTask.java Co-authored-by: AloysZhang <lofterzh...@gmail.com> * [INLONG-11571][Agent] Modify code based on comments --------- Co-authored-by: AloysZhang <lofterzh...@gmail.com> --- inlong-agent/agent-plugins/pom.xml | 5 + .../agent/plugin/fetcher/ManagerFetcher.java | 3 +- .../inlong/agent/plugin/instance/COSInstance.java | 35 ++ .../agent/plugin/instance/CommonInstance.java | 2 +- .../inlong/agent/plugin/sinks/ProxySink.java | 12 +- .../inlong/agent/plugin/sources/COSSource.java | 356 +++++++++++++++++++++ .../inlong/agent/plugin/sources/LogFileSource.java | 2 +- .../agent/plugin/sources/file/AbstractSource.java | 14 +- .../inlong/agent/plugin/task/cos/COSTask.java | 338 +++++++++++++++++++ .../inlong/agent/plugin/task/cos/FileScanner.java | 165 ++++++++++ .../agent/plugin/utils/cos/COSConfigHandler.java | 26 ++ .../inlong/agent/plugin/utils/cos/COSUtils.java | 33 ++ .../plugin/utils/cos/DefaultCOSConfigHandler.java | 28 ++ licenses/inlong-agent/LICENSE | 1 + .../inlong-agent/licenses/LICENSE-cos-java-sdk.txt | 21 ++ pom.xml | 1 + 16 files changed, 1025 insertions(+), 17 deletions(-) diff --git a/inlong-agent/agent-plugins/pom.xml b/inlong-agent/agent-plugins/pom.xml index 4c77f84dc3..863e285ab2 100644 --- a/inlong-agent/agent-plugins/pom.xml +++ b/inlong-agent/agent-plugins/pom.xml @@ -92,6 +92,11 @@ <artifactId>kafka-clients</artifactId> <version>${kafka.clients.version}</version> </dependency> + <dependency> + <groupId>com.qcloud</groupId> + <artifactId>cos_api</artifactId> + <version>${cos.sdk.version}</version> + </dependency> <dependency> <groupId>io.debezium</groupId> <artifactId>debezium-connector-mysql</artifactId> diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java index 01d7128f8b..3ab63b0118 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java @@ -28,6 +28,7 @@ import org.apache.inlong.agent.utils.AgentUtils; import org.apache.inlong.agent.utils.HttpManager; import org.apache.inlong.agent.utils.ThreadUtils; import org.apache.inlong.common.enums.PullJobTypeEnum; +import org.apache.inlong.common.enums.TaskTypeEnum; import org.apache.inlong.common.pojo.agent.AgentConfigInfo; import org.apache.inlong.common.pojo.agent.AgentConfigRequest; import org.apache.inlong.common.pojo.agent.AgentResponseCode; @@ -237,7 +238,7 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher { dataConfig.setInlongGroupId("devcloud_group_id"); dataConfig.setInlongStreamId("devcloud_stream_id"); dataConfig.setDataReportType(0); - dataConfig.setTaskType(3); + dataConfig.setTaskType(TaskTypeEnum.FILE.getType()); dataConfig.setTaskId(taskId); dataConfig.setState(state); dataConfig.setTimeZone("GMT+8:00"); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/COSInstance.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/COSInstance.java new file mode 100644 index 0000000000..6f40f727dd --- /dev/null +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/COSInstance.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.instance; + +import org.apache.inlong.agent.conf.InstanceProfile; +import org.apache.inlong.agent.constant.TaskConstants; + +import java.io.IOException; + +/** + * cos instance contains source and sink. + * main job is to read from source and write to sink + */ +public class COSInstance extends CommonInstance { + + @Override + public void setInodeInfo(InstanceProfile profile) throws IOException { + profile.set(TaskConstants.INODE_INFO, ""); + } +} diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java index 7267066aee..6cc97a159c 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java @@ -122,7 +122,7 @@ public abstract class CommonInstance extends Instance { @Override public void run() { - Thread.currentThread().setName("file-instance-core-" + getTaskId() + "-" + getInstanceId()); + Thread.currentThread().setName("instance-core-" + getTaskId() + "-" + getInstanceId()); running = true; try { doRun(); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java index 2f54ec59de..e00ad65cba 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java @@ -196,18 +196,18 @@ public class ProxySink extends AbstractSink { } Long start = AgentUtils.getCurrentTime(); shutdown = true; + senderManager.Stop(); + LOGGER.info("destroy proxySink, wait for sender close {} ms instance {}", AgentUtils.getCurrentTime() - start, + profile.getInstanceId()); + start = AgentUtils.getCurrentTime(); while (running || offsetRunning) { AgentUtils.silenceSleepInMs(LOOP_WAIT_TIME_MS); } - LOGGER.info("destroy proxySink wait run elapse {} ms instance {}", AgentUtils.getCurrentTime() - start, - profile.getInstanceId()); - start = AgentUtils.getCurrentTime(); - senderManager.Stop(); - LOGGER.info("destroy proxySink wait sender elapse {} ms instance {}", AgentUtils.getCurrentTime() - start, + LOGGER.info("destroy proxySink, wait for run close {} ms instance {}", AgentUtils.getCurrentTime() - start, profile.getInstanceId()); start = AgentUtils.getCurrentTime(); clearOffset(); - LOGGER.info("destroy proxySink wait offset elapse {} ms instance {}", AgentUtils.getCurrentTime() - start, + LOGGER.info("destroy proxySink, wait for offset clear {} ms instance {}", AgentUtils.getCurrentTime() - start, profile.getInstanceId()); LOGGER.info("destroy sink {} end", sourceName); } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/COSSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/COSSource.java new file mode 100755 index 0000000000..b6792ac82d --- /dev/null +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/COSSource.java @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.sources; + +import org.apache.inlong.agent.common.AgentThreadFactory; +import org.apache.inlong.agent.conf.AgentConfiguration; +import org.apache.inlong.agent.conf.InstanceProfile; +import org.apache.inlong.agent.conf.OffsetProfile; +import org.apache.inlong.agent.constant.TaskConstants; +import org.apache.inlong.agent.core.FileStaticManager; +import org.apache.inlong.agent.core.FileStaticManager.FileStatic; +import org.apache.inlong.agent.core.task.MemoryManager; +import org.apache.inlong.agent.core.task.OffsetManager; +import org.apache.inlong.agent.except.FileException; +import org.apache.inlong.agent.metrics.audit.AuditUtils; +import org.apache.inlong.agent.plugin.sources.extend.DefaultExtendedHandler; +import org.apache.inlong.agent.plugin.sources.file.AbstractSource; +import org.apache.inlong.agent.plugin.utils.cos.COSUtils; +import org.apache.inlong.agent.utils.AgentUtils; +import org.apache.inlong.agent.utils.ThreadUtils; + +import com.qcloud.cos.COSClient; +import com.qcloud.cos.model.COSObject; +import com.qcloud.cos.model.GetObjectRequest; +import com.qcloud.cos.model.ObjectMetadata; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import static org.apache.inlong.agent.constant.TaskConstants.COS_CONTENT_STYLE; + +/** + * Read COS files + */ +public class COSSource extends AbstractSource { + + public static final int LEN_OF_FILE_OFFSET_ARRAY = 2; + public static final String AGENT_GLOBAL_COS_SOURCE_PERMIT = "agent.global.cos.source.permit"; + public static final int DEFAULT_AGENT_GLOBAL_COS_SOURCE_PERMIT = 128 * 1000 * 1000; + + @Data + @AllArgsConstructor + @NoArgsConstructor + protected class FileOffset { + + private Long lineOffset; + private Long byteOffset; + private boolean hasByteOffset; + } + + private static final Logger LOGGER = LoggerFactory.getLogger(COSSource.class); + public static final String OFFSET_SEP = ":"; + protected final Integer WAIT_TIMEOUT_MS = 10; + private final Integer SIZE_OF_BUFFER_TO_READ_FILE = 1024 * 1024; + private final Long META_UPDATE_INTERVAL_MS = 10000L; + private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + + private String fileName; + private byte[] bufferToReadFile; + public volatile long linePosition = 0; + public volatile long bytePosition = 0; + private volatile boolean fileExist = true; + private volatile long lastInodeUpdateTime = 0; + private COSClient cosClient; + private String bucketName; + private String secretId; + private String secretKey; + private String strRegion; + private ObjectMetadata metadata; + protected BlockingQueue<SourceData> queue; + private static final ThreadPoolExecutor EXECUTOR_SERVICE = new ThreadPoolExecutor( + 0, Integer.MAX_VALUE, + 1L, TimeUnit.SECONDS, + new SynchronousQueue<>(), + new AgentThreadFactory("cos-source-pool")); + private volatile boolean running = false; + + public COSSource() { + } + + @Override + protected void initExtendClass() { + extendClass = DefaultExtendedHandler.class.getCanonicalName(); + } + + @Override + protected void initSource(InstanceProfile profile) { + try { + String offset = ""; + if (offsetProfile != null) { + offset = offsetProfile.toJsonStr(); + } + LOGGER.info("LogFileSource init: {} offset: {}", profile.toJsonStr(), offset); + AgentConfiguration conf = AgentConfiguration.getAgentConf(); + int permit = conf.getInt(AGENT_GLOBAL_COS_SOURCE_PERMIT, DEFAULT_AGENT_GLOBAL_COS_SOURCE_PERMIT); + MemoryManager.getInstance().addSemaphore(AGENT_GLOBAL_COS_SOURCE_PERMIT, permit); + fileName = profile.getInstanceId(); + bucketName = profile.get(TaskConstants.COS_TASK_BUCKET_NAME); + secretId = profile.get(TaskConstants.COS_TASK_SECRET_ID); + secretKey = profile.get(TaskConstants.COS_TASK_SECRET_KEY); + strRegion = profile.get(TaskConstants.COS_TASK_REGION); + cosClient = COSUtils.createCli(secretId, secretKey, strRegion); + metadata = cosClient.getObjectMetadata(bucketName, fileName); + queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE); + bufferToReadFile = new byte[SIZE_OF_BUFFER_TO_READ_FILE]; + lastInodeUpdateTime = AgentUtils.getCurrentTime(); + initOffset(taskId); + EXECUTOR_SERVICE.execute(run()); + } catch (Exception ex) { + stopRunning(); + throw new FileException("error init stream for " + fileName, ex); + } + } + + @Override + protected boolean doPrepareToRead() { + if (AgentUtils.getCurrentTime() - lastInodeUpdateTime > META_UPDATE_INTERVAL_MS) { + metadata = cosClient.getObjectMetadata(bucketName, fileName); + lastInodeUpdateTime = AgentUtils.getCurrentTime(); + } + if (metadata.getContentLength() < bytePosition) { + fileExist = false; + LOGGER.info("file rotate, instance will restart and offset will be clean, file {}", + fileName); + return false; + } + return true; + } + + @Override + protected List<SourceData> readFromSource() { + if (queue.isEmpty()) { + return null; + } + int count = 0; + int len = 0; + List<SourceData> lines = new ArrayList<>(); + while (!queue.isEmpty() && count < BATCH_READ_LINE_COUNT && len < BATCH_READ_LINE_TOTAL_LEN) { + if (len + queue.peek().getData().length > BATCH_READ_LINE_TOTAL_LEN) { + break; + } + len += queue.peek().getData().length; + count++; + lines.add(queue.poll()); + } + MemoryManager.getInstance().release(AGENT_GLOBAL_COS_SOURCE_PERMIT, len); + return lines; + } + + @Override + protected void printCurrentState() { + LOGGER.info("path is {}, linePosition {}, bytePosition is {} file len {}", fileName, linePosition, + bytePosition, metadata.getContentLength()); + } + + @Override + protected String getThreadName() { + return "cos-file-source-" + taskId + "-" + fileName; + } + + private void initOffset(String taskId) { + long lineOffset; + long byteOffset; + if (offsetProfile != null) { + FileOffset fileOffset = parseFIleOffset(offsetProfile.getOffset()); + lineOffset = fileOffset.lineOffset; + byteOffset = fileOffset.byteOffset; + LOGGER.info("initOffset inode no change taskId {} restore lineOffset {} byteOffset {}, file {}", taskId, + lineOffset, byteOffset, fileName); + } else { + lineOffset = 0; + byteOffset = 0; + LOGGER.info("initOffset taskId {} for new all read lineOffset {} byteOffset {} file {}", taskId, + lineOffset, byteOffset, fileName); + } + linePosition = lineOffset; + bytePosition = byteOffset; + } + + private Runnable run() { + return () -> { + AgentThreadFactory.nameThread(getThreadName()); + running = true; + try { + doRun(); + } catch (Throwable e) { + LOGGER.error("do run error maybe file deleted: ", e); + ThreadUtils.threadThrowableHandler(Thread.currentThread(), e); + } + running = false; + }; + } + + /** + * Read new lines. + * + * @return The new position after the lines have been read + * @throws IOException if an I/O error occurs. + */ + protected void doRun() throws IOException { + GetObjectRequest getObjectRequest = new GetObjectRequest(bucketName, fileName); + getObjectRequest.setRange(bytePosition, metadata.getContentLength()); + COSObject cosObject = cosClient.getObject(getObjectRequest); + InputStream inputStream = cosObject.getObjectContent(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + int num; + boolean overLen = false; + while ((num = inputStream.read(bufferToReadFile)) != -1) { + LOGGER.debug("read size {}", num); + for (int i = 0; i < num; i++) { + byte ch = bufferToReadFile[i]; + bytePosition++; + switch (ch) { + case '\n': + linePosition++; + boolean suc = false; + while (isRunnable() && !suc) { + SourceData sourceData = new SourceData(baos.toByteArray(), + getOffsetString(linePosition, bytePosition)); + boolean suc4Queue = waitForPermit(AGENT_GLOBAL_COS_SOURCE_PERMIT, + sourceData.getData().length); + if (!suc4Queue) { + break; + } + suc = queue.offer(sourceData); + if (!suc) { + MemoryManager.getInstance() + .release(AGENT_GLOBAL_COS_SOURCE_PERMIT, sourceData.getData().length); + AgentUtils.silenceSleepInMs(WAIT_TIMEOUT_MS); + } + } + if (overLen) { + LOGGER.warn("readLines over len finally string len {}", + new String(baos.toByteArray()).length()); + long auditTime = 0; + auditTime = profile.getSinkDataTime(); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_FAILED, inlongGroupId, inlongStreamId, + auditTime, 1, maxPackSize, auditVersion); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_FAILED_REAL_TIME, inlongGroupId, + inlongStreamId, AgentUtils.getCurrentTime(), 1, maxPackSize, auditVersion); + } + baos.reset(); + overLen = false; + break; + case '\r': + break; + default: + if (baos.size() < maxPackSize) { + baos.write(ch); + } else { + overLen = true; + } + } + } + } + baos.close(); + inputStream.close(); + cosObject.close(); + } + + private String getOffsetString(Long lineOffset, Long byteOffset) { + return lineOffset + OFFSET_SEP + byteOffset; + } + + private FileOffset parseFIleOffset(String offset) { + String[] offsetArray = offset.split(OFFSET_SEP); + if (offsetArray.length == LEN_OF_FILE_OFFSET_ARRAY) { + return new FileOffset(Long.parseLong(offsetArray[0]), Long.parseLong(offsetArray[1]), true); + } else { + return new FileOffset(Long.parseLong(offsetArray[0]), null, false); + } + } + + @Override + protected boolean isRunnable() { + return runnable && fileExist; + } + + @Override + public boolean sourceExist() { + return fileExist; + } + + @Override + protected void releaseSource() { + while (running) { + AgentUtils.silenceSleepInMs(1); + } + if (cosClient != null) { + FileStatic data = new FileStatic(); + data.setTaskId(taskId); + data.setRetry(String.valueOf(profile.isRetry())); + data.setContentType(profile.get(COS_CONTENT_STYLE)); + data.setGroupId(profile.getInlongGroupId()); + data.setStreamId(profile.getInlongStreamId()); + data.setDataTime(format.format(profile.getSinkDataTime())); + data.setFileName(profile.getInstanceId()); + data.setFileLen(String.valueOf(metadata.getContentLength())); + data.setReadBytes(String.valueOf(bytePosition)); + data.setReadLines(String.valueOf(linePosition)); + OffsetProfile offsetProfile = OffsetManager.getInstance().getOffset(taskId, instanceId); + if (offsetProfile != null) { + data.setSendLines(offsetProfile.getOffset()); + } + FileStaticManager.putStaticMsg(data); + cosClient.shutdown(); + } + while (!queue.isEmpty()) { + MemoryManager.getInstance().release(AGENT_GLOBAL_COS_SOURCE_PERMIT, queue.poll().getData().length); + } + } + + private boolean waitForPermit(String permitName, int permitLen) { + boolean suc = false; + while (!suc) { + suc = MemoryManager.getInstance().tryAcquire(permitName, permitLen); + if (!suc) { + MemoryManager.getInstance().printDetail(permitName, "cos_source"); + if (!isRunnable()) { + return false; + } + AgentUtils.silenceSleepInMs(WAIT_TIMEOUT_MS); + } + } + return true; + } +} diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java index 300c16168f..ecaa53196b 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java @@ -71,7 +71,7 @@ public class LogFileSource extends AbstractSource { public static final String OFFSET_SEP = ":"; private final Integer SIZE_OF_BUFFER_TO_READ_FILE = 64 * 1024; private final Long INODE_UPDATE_INTERVAL_MS = 1000L; - private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); // 设置格式 + private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); private String fileName; private File file; diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java index 3ce1378fa7..72d14327a2 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java @@ -165,7 +165,7 @@ public abstract class AbstractSource implements Source { break; } List<SourceData> lines = readFromSource(); - if (lines != null && lines.isEmpty()) { + if (lines == null || lines.isEmpty()) { if (queue.isEmpty()) { emptyCount++; } else { @@ -176,14 +176,12 @@ public abstract class AbstractSource implements Source { continue; } emptyCount = 0; - if (lines != null) { - for (int i = 0; i < lines.size(); i++) { - boolean suc4Queue = waitForPermit(AGENT_GLOBAL_READER_QUEUE_PERMIT, lines.get(i).getData().length); - if (!suc4Queue) { - break; - } - putIntoQueue(lines.get(i)); + for (int i = 0; i < lines.size(); i++) { + boolean suc4Queue = waitForPermit(AGENT_GLOBAL_READER_QUEUE_PERMIT, lines.get(i).getData().length); + if (!suc4Queue) { + break; } + putIntoQueue(lines.get(i)); } MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_READ_LINE_TOTAL_LEN); if (AgentUtils.getCurrentTime() - lastPrintTime > CORE_THREAD_PRINT_INTERVAL_MS) { diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/COSTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/COSTask.java new file mode 100644 index 0000000000..eceb38e8be --- /dev/null +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/COSTask.java @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.task.cos; + +import org.apache.inlong.agent.conf.InstanceProfile; +import org.apache.inlong.agent.conf.TaskProfile; +import org.apache.inlong.agent.constant.TaskConstants; +import org.apache.inlong.agent.core.task.TaskAction; +import org.apache.inlong.agent.plugin.task.AbstractTask; +import org.apache.inlong.agent.plugin.task.cos.FileScanner.BasicFileInfo; +import org.apache.inlong.agent.plugin.utils.cos.COSUtils; +import org.apache.inlong.agent.plugin.utils.regex.NewDateUtils; +import org.apache.inlong.agent.plugin.utils.regex.Scanner; +import org.apache.inlong.agent.state.State; +import org.apache.inlong.agent.utils.AgentUtils; +import org.apache.inlong.agent.utils.DateTransUtils; + +import com.qcloud.cos.COSClient; +import com.qcloud.cos.model.ObjectMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TimeZone; +import java.util.TreeSet; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * Watch directory, if new valid files are created, create instance correspondingly. + */ +public class COSTask extends AbstractTask { + + private static final Logger LOGGER = LoggerFactory.getLogger(COSTask.class); + public static final String DEFAULT_COS_INSTANCE = "org.apache.inlong.agent.plugin.instance.COSInstance"; + private static final int INSTANCE_QUEUE_CAPACITY = 10; + private final Map<String/* dataTime */, Map<String/* fileName */, InstanceProfile>> eventMap = + new ConcurrentHashMap<>(); + public static final long DAY_TIMEOUT_INTERVAL = 2 * 24 * 3600 * 1000; + private boolean retry; + private long startTime; + private long endTime; + private String originPattern; + private long lastScanTime = 0; + public final long SCAN_INTERVAL = 1 * 60 * 1000; + private volatile boolean runAtLeastOneTime = false; + private BlockingQueue<InstanceProfile> instanceQueue; + private COSClient cosClient; + private String bucketName; + private String secretId; + private String secretKey; + private String strRegion; + private String timeOffset = ""; + + @Override + protected int getInstanceLimit() { + return taskProfile.getInt(TaskConstants.COS_MAX_NUM); + } + + @Override + protected void initTask() { + timeOffset = taskProfile.get(TaskConstants.TASK_COS_TIME_OFFSET, ""); + instanceQueue = new LinkedBlockingQueue<>(INSTANCE_QUEUE_CAPACITY); + retry = taskProfile.getBoolean(TaskConstants.COS_TASK_RETRY, false); + originPattern = taskProfile.get(TaskConstants.COS_TASK_PATTERN); + bucketName = taskProfile.get(TaskConstants.COS_TASK_BUCKET_NAME); + secretId = taskProfile.get(TaskConstants.COS_TASK_SECRET_ID); + secretKey = taskProfile.get(TaskConstants.COS_TASK_SECRET_KEY); + strRegion = taskProfile.get(TaskConstants.COS_TASK_REGION); + cosClient = COSUtils.createCli(secretId, secretKey, strRegion); + if (retry) { + initRetryTask(taskProfile); + } + } + + private boolean initRetryTask(TaskProfile profile) { + String dataTimeFrom = profile.get(TaskConstants.COS_TASK_TIME_FROM, ""); + String dataTimeTo = profile.get(TaskConstants.COS_TASK_TIME_TO, ""); + try { + startTime = DateTransUtils.timeStrConvertToMillSec(dataTimeFrom, profile.getCycleUnit()); + endTime = DateTransUtils.timeStrConvertToMillSec(dataTimeTo, profile.getCycleUnit()); + } catch (ParseException e) { + LOGGER.error("retry task time error start {} end {}", dataTimeFrom, dataTimeTo, e); + return false; + } + return true; + } + + @Override + protected List<InstanceProfile> getNewInstanceList() { + if (retry) { + runForRetry(); + } else { + runForNormal(); + } + List<InstanceProfile> list = new ArrayList<>(); + while (list.size() < INSTANCE_QUEUE_CAPACITY && !instanceQueue.isEmpty()) { + InstanceProfile profile = instanceQueue.poll(); + if (profile != null) { + list.add(profile); + } + } + return list; + } + + @Override + public boolean isProfileValid(TaskProfile profile) { + if (!profile.allRequiredKeyExist()) { + LOGGER.error("task profile needs all required key"); + return false; + } + if (!profile.hasKey(TaskConstants.COS_TASK_CYCLE_UNIT)) { + LOGGER.error("task profile needs cos cycle unit"); + return false; + } + if (!profile.hasKey(TaskConstants.TASK_CYCLE_UNIT)) { + LOGGER.error("task profile needs cycle unit"); + return false; + } + if (profile.get(TaskConstants.TASK_CYCLE_UNIT) + .compareTo(profile.get(TaskConstants.COS_TASK_CYCLE_UNIT)) != 0) { + LOGGER.error("task profile cycle unit must be consistent"); + return false; + } + if (!profile.hasKey(TaskConstants.TASK_TIME_ZONE)) { + LOGGER.error("task profile needs time zone"); + return false; + } + boolean ret = profile.hasKey(TaskConstants.COS_TASK_PATTERN) + && profile.hasKey(TaskConstants.COS_MAX_NUM); + if (!ret) { + LOGGER.error("task profile needs file keys"); + return false; + } + if (!profile.hasKey(TaskConstants.TASK_COS_TIME_OFFSET)) { + LOGGER.error("task profile needs time offset"); + return false; + } + if (profile.getBoolean(TaskConstants.COS_TASK_RETRY, false)) { + if (!initRetryTask(profile)) { + return false; + } + } + return true; + } + + @Override + protected void releaseTask() { + cosClient.shutdown(); + } + + private void runForRetry() { + if (!runAtLeastOneTime) { + scanExistingFile(); + runAtLeastOneTime = true; + } + dealWithEventMap(); + if (allInstanceFinished()) { + LOGGER.info("retry task finished, send action to task manager, taskId {}", getTaskId()); + TaskAction action = new TaskAction(org.apache.inlong.agent.core.task.ActionType.FINISH, taskProfile); + taskManager.submitAction(action); + doChangeState(State.SUCCEEDED); + } + } + + private void runForNormal() { + if (AgentUtils.getCurrentTime() - lastScanTime > SCAN_INTERVAL) { + scanExistingFile(); + lastScanTime = AgentUtils.getCurrentTime(); + } + dealWithEventMap(); + } + + private void scanExistingFile() { + LOGGER.info("test123 qqqq"); + List<BasicFileInfo> fileInfos = FileScanner.scanTaskBetweenTimes(cosClient, bucketName, originPattern, + taskProfile.getCycleUnit(), timeOffset, startTime, endTime, retry); + LOGGER.info("taskId {} scan {} get file count {}", getTaskId(), originPattern, fileInfos.size()); + fileInfos.forEach((fileInfo) -> { + addToEvenMap(fileInfo.fileName, fileInfo.dataTime); + if (retry) { + instanceCount++; + } + }); + } + + private boolean isInEventMap(String fileName, String dataTime) { + Map<String, InstanceProfile> fileToProfile = eventMap.get(dataTime); + if (fileToProfile == null) { + return false; + } + return fileToProfile.get(fileName) != null; + } + + private void dealWithEventMap() { + removeTimeoutEvent(eventMap, retry); + dealWithEventMapWithCycle(); + } + + private void dealWithEventMapWithCycle() { + long startScanTime = startTime; + long endScanTime = endTime; + List<String> dataTimeList = Scanner.getDataTimeList(startScanTime, endScanTime, taskProfile.getCycleUnit(), + timeOffset, retry); + if (dataTimeList.isEmpty()) { + LOGGER.error("get dataTimeList return empty list"); + return; + } + Set<String> dealtDataTime = new HashSet<>(); + // normal task first handle current data time + if (!retry) { + String current = dataTimeList.remove(dataTimeList.size() - 1); + dealtDataTime.add(current); + if (!dealEventMapByDataTime(current, true)) { + return; + } + } + dataTimeList.forEach(dataTime -> { + dealtDataTime.add(dataTime); + if (!dealEventMapByDataTime(dataTime, false)) { + return; + } + }); + for (String dataTime : eventMap.keySet()) { + if (!dealtDataTime.contains(dataTime)) { + dealEventMapByDataTime(dataTime, false); + } + } + } + + private boolean dealEventMapByDataTime(String dataTime, boolean isCurrentDataTime) { + Map<String, InstanceProfile> sameDataTimeEvents = eventMap.get(dataTime); + if (sameDataTimeEvents == null || sameDataTimeEvents.isEmpty()) { + return true; + } + if (shouldStartNow(dataTime)) { + Set<InstanceProfile> sortedEvents = new TreeSet<>(Comparator.comparing(InstanceProfile::getInstanceId)); + sortedEvents.addAll(sameDataTimeEvents.values()); + for (InstanceProfile sortEvent : sortedEvents) { + String fileName = sortEvent.getInstanceId(); + InstanceProfile profile = sameDataTimeEvents.get(fileName); + if (!isCurrentDataTime && isFull()) { + return false; + } + if (!instanceQueue.offer(profile)) { + return false; + } + sameDataTimeEvents.remove(fileName); + } + } + return true; + } + + /* + * Calculate whether the event needs to be processed at the current time based on its data time, business cycle, and + * offset + */ + private boolean shouldStartNow(String dataTime) { + String shouldStartTime = + NewDateUtils.getShouldStartTime(dataTime, taskProfile.getCycleUnit(), timeOffset); + String currentTime = getCurrentTime(); + return currentTime.compareTo(shouldStartTime) >= 0; + } + + private void removeTimeoutEvent(Map<String, Map<String, InstanceProfile>> eventMap, boolean isRetry) { + if (isRetry) { + return; + } + for (Map.Entry<String, Map<String, InstanceProfile>> entry : eventMap.entrySet()) { + /* If the data time of the event is within 2 days before (after) the current time, it is valid */ + String dataTime = entry.getKey(); + if (!NewDateUtils.isValidCreationTime(dataTime, DAY_TIMEOUT_INTERVAL)) { + /* Remove it from memory map. */ + eventMap.remove(dataTime); + LOGGER.warn("remove too old event from event map. dataTime {}", dataTime); + } + } + } + + private String getCurrentTime() { + SimpleDateFormat dateFormat = new SimpleDateFormat(NewDateUtils.DEFAULT_FORMAT); + TimeZone timeZone = TimeZone.getTimeZone(NewDateUtils.DEFAULT_TIME_ZONE); + dateFormat.setTimeZone(timeZone); + return dateFormat.format(new Date(System.currentTimeMillis())); + } + + private void addToEvenMap(String fileName, String dataTime) { + if (isInEventMap(fileName, dataTime)) { + LOGGER.info("add to evenMap isInEventMap returns true skip taskId {} dataTime {} fileName {}", + taskProfile.getTaskId(), dataTime, fileName); + return; + } + LOGGER.info("test123 {}", cosClient); + ObjectMetadata meta = cosClient.getObjectMetadata(bucketName, fileName); + Long fileUpdateTime = meta.getLastModified().getTime(); + if (!shouldAddAgain(fileName, fileUpdateTime)) { + LOGGER.info("add to evenMap shouldAddAgain returns false skip taskId {} dataTime {} fileName {}", + taskProfile.getTaskId(), dataTime, fileName); + return; + } + Map<String, InstanceProfile> sameDataTimeEvents = eventMap.computeIfAbsent(dataTime, + mapKey -> new ConcurrentHashMap<>()); + boolean containsInMemory = sameDataTimeEvents.containsKey(fileName); + if (containsInMemory) { + LOGGER.error("should not happen! may be {} has been deleted and add again", fileName); + return; + } + String cycleUnit = taskProfile.getCycleUnit(); + InstanceProfile instanceProfile = taskProfile.createInstanceProfile(DEFAULT_COS_INSTANCE, + fileName, cycleUnit, dataTime, fileUpdateTime); + sameDataTimeEvents.put(fileName, instanceProfile); + LOGGER.info("add to eventMap taskId {} dataTime {} fileName {}", taskProfile.getTaskId(), dataTime, fileName); + } +} diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/FileScanner.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/FileScanner.java new file mode 100644 index 0000000000..4eac0eeff2 --- /dev/null +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/FileScanner.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.task.cos; + +import org.apache.inlong.agent.plugin.utils.regex.PatternUtil; +import org.apache.inlong.agent.plugin.utils.regex.Scanner; +import org.apache.inlong.agent.plugin.utils.regex.Scanner.FinalPatternInfo; +import org.apache.inlong.agent.utils.DateTransUtils; + +import com.qcloud.cos.COSClient; +import com.qcloud.cos.exception.CosClientException; +import com.qcloud.cos.exception.CosServiceException; +import com.qcloud.cos.model.COSObjectSummary; +import com.qcloud.cos.model.ListObjectsRequest; +import com.qcloud.cos.model.ObjectListing; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/* + * This class is mainly used for scanning log file that we want to read. We use this class at + * inlong_agent recover process, the do and redo tasks and the current log file access when we deploy a + * new data source. + */ +public class FileScanner { + + public static final int DEFAULT_KEY_COUNT = 100; + public static final String DEFAULT_DELIMITER = "/"; + public static final char PATH_SEP = '/'; + + public static class BasicFileInfo { + + public String fileName; + public String dataTime; + + public BasicFileInfo(String fileName, String dataTime) { + this.fileName = fileName; + this.dataTime = dataTime; + } + } + + private static final Logger logger = LoggerFactory.getLogger(FileScanner.class); + + public static List<BasicFileInfo> scanTaskBetweenTimes(COSClient cosClient, String bucketName, String originPattern, + String cycleUnit, String timeOffset, long startTime, long endTime, boolean isRetry) { + List<FinalPatternInfo> finalPatternInfos = Scanner.getFinalPatternInfos(originPattern, cycleUnit, timeOffset, + startTime, endTime, isRetry); + List<BasicFileInfo> infos = new ArrayList<>(); + for (FinalPatternInfo finalPatternInfo : finalPatternInfos) { + String prefix = PatternUtil.getBeforeFirstWildcard(finalPatternInfo.finalPattern); + Pattern pattern = Pattern.compile(finalPatternInfo.finalPattern, + Pattern.CASE_INSENSITIVE | Pattern.DOTALL | Pattern.MULTILINE); + List<BasicFileInfo> fileInfos = scanTaskInOneCycle(cosClient, bucketName, pattern, prefix, + finalPatternInfo.dataTime, cycleUnit); + infos.addAll(fileInfos); + } + return infos; + } + + public static List<BasicFileInfo> scanTaskInOneCycle(COSClient cosClient, String bucketName, Pattern pattern, + String prefix, Long dataTime, String cycleUnit) { + List<BasicFileInfo> infos = new ArrayList<>(); + ObjectListing objectListing; + ListObjectsRequest listObjectsRequest = new ListObjectsRequest(); + do { + try { + listObjectsRequest.setBucketName(bucketName); + listObjectsRequest.setPrefix(prefix); + listObjectsRequest.setDelimiter(DEFAULT_DELIMITER); + listObjectsRequest.setMaxKeys(DEFAULT_KEY_COUNT); + objectListing = cosClient.listObjects(listObjectsRequest); + } catch (CosServiceException e) { + logger.error("scanTaskInOneCycle finalPattern {} CosServiceException", pattern.pattern(), e); + return infos; + } catch (CosClientException e) { + logger.error("scanTaskInOneCycle finalPattern {} CosClientException", pattern.pattern(), e); + return infos; + } + List<String> commonPrefixes = objectListing.getCommonPrefixes(); + int depth; + Pattern patternByDepth = null; + if (!commonPrefixes.isEmpty()) { + depth = countCharacterOccurrences(commonPrefixes.get(0), PATH_SEP); + String temp = findNthOccurrenceSubstring(pattern.pattern(), PATH_SEP, depth); + patternByDepth = Pattern.compile(temp, Pattern.CASE_INSENSITIVE | Pattern.DOTALL | Pattern.MULTILINE); + } + for (String commonPrefix : commonPrefixes) { + Matcher matcher = patternByDepth.matcher(commonPrefix); + if (matcher.matches()) { + infos.addAll(scanTaskInOneCycle(cosClient, bucketName, pattern, commonPrefix, dataTime, cycleUnit)); + } + } + List<COSObjectSummary> cosObjectSummaries = objectListing.getObjectSummaries(); + for (COSObjectSummary cosObjectSummary : cosObjectSummaries) { + String key = cosObjectSummary.getKey(); + Matcher matcher = pattern.matcher(key); + if (matcher.lookingAt()) { + long fileSize = cosObjectSummary.getSize(); + String storageClasses = cosObjectSummary.getStorageClass(); + infos.add(new BasicFileInfo(key, + DateTransUtils.millSecConvertToTimeStr(dataTime, cycleUnit))); + String strDataTime = DateTransUtils.millSecConvertToTimeStr(dataTime, cycleUnit); + logger.info("list key {} dataTime {} size {}, storageClasses {}", key, strDataTime, fileSize, + storageClasses); + } + } + String nextMarker = objectListing.getNextMarker(); + listObjectsRequest.setMarker(nextMarker); + } while (objectListing.isTruncated()); + return infos; + } + + public static int countCharacterOccurrences(String input, char targetChar) { + if (input == null) { + throw new IllegalArgumentException("Input string cannot be null"); + } + int count = 0; + + for (int i = 0; i < input.length(); i++) { + if (input.charAt(i) == targetChar) { + count++; + } + } + return count; + } + + public static String findNthOccurrenceSubstring(String input, char targetChar, int n) { + int endIndex = findNthOccurrence(input, targetChar, n); + if (endIndex != -1) { + return input.substring(0, endIndex + 1); + } else { + return null; + } + } + + public static int findNthOccurrence(String input, char targetChar, int n) { + int currentIndex = -1; + for (int i = 0; i < n; i++) { + currentIndex = input.indexOf(targetChar, currentIndex + 1); + if (currentIndex == -1) { + return -1; + } + } + return currentIndex; + } +} diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/cos/COSConfigHandler.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/cos/COSConfigHandler.java new file mode 100644 index 0000000000..259d330776 --- /dev/null +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/cos/COSConfigHandler.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.utils.cos; + +import com.qcloud.cos.ClientConfig; + +// For some private, customized extension processing +public abstract class COSConfigHandler { + + abstract public ClientConfig getClientConfig(String region); +} diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/cos/COSUtils.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/cos/COSUtils.java new file mode 100644 index 0000000000..111e94212f --- /dev/null +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/cos/COSUtils.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.utils.cos; + +import com.qcloud.cos.COSClient; +import com.qcloud.cos.ClientConfig; +import com.qcloud.cos.auth.BasicCOSCredentials; +import com.qcloud.cos.auth.COSCredentials; + +public class COSUtils { + + public static COSClient createCli(String secretId, String secretKey, String region) { + COSCredentials cred = new BasicCOSCredentials(secretId, secretKey); + COSConfigHandler configHandler = new DefaultCOSConfigHandler(); + ClientConfig clientConfig = configHandler.getClientConfig(region); + return new COSClient(cred, clientConfig); + } +} \ No newline at end of file diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/cos/DefaultCOSConfigHandler.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/cos/DefaultCOSConfigHandler.java new file mode 100644 index 0000000000..a9dda478a2 --- /dev/null +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/cos/DefaultCOSConfigHandler.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.utils.cos; + +import com.qcloud.cos.ClientConfig; +import com.qcloud.cos.region.Region; + +public class DefaultCOSConfigHandler extends COSConfigHandler { + + public ClientConfig getClientConfig(String region) { + return new ClientConfig(new Region(region)); + } +} diff --git a/licenses/inlong-agent/LICENSE b/licenses/inlong-agent/LICENSE index 33a51c916a..cd2f529072 100644 --- a/licenses/inlong-agent/LICENSE +++ b/licenses/inlong-agent/LICENSE @@ -541,6 +541,7 @@ MIT licenses The following components are provided under MIT license. See project link for details. The text of each license is also included at licenses/LICENSE-[project].txt. + com.qcloud:cos_api:jar:5.6.54 - cos-java-sdk (https://github.com/tencentyun/cos-java-sdk-v5), (MIT License) net.sourceforge.argparse4j:argparse4j:0.7.0 - argparse4j (https://github.com/argparse4j/argparse4j/tree/argparse4j-0.7.0), (MIT) org.bouncycastle:bcpkix-jdk15on:1.69 - Bouncy Castle PKIX, CMS, EAC, TSP, PKCS, OCSP, CMP, and CRMF APIs (https://www.bouncycastle.org/java.html), (MIT License) org.bouncycastle:bcprov-ext-jdk15on:1.69 - Bouncy Castle Provider (https://www.bouncycastle.org/java.html), (MIT License) diff --git a/licenses/inlong-agent/licenses/LICENSE-cos-java-sdk.txt b/licenses/inlong-agent/licenses/LICENSE-cos-java-sdk.txt new file mode 100644 index 0000000000..4b32321f98 --- /dev/null +++ b/licenses/inlong-agent/licenses/LICENSE-cos-java-sdk.txt @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2017 腾讯云 + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/pom.xml b/pom.xml index 4330360033..0562af4dd7 100644 --- a/pom.xml +++ b/pom.xml @@ -190,6 +190,7 @@ <HikariCP.version>4.0.3</HikariCP.version> <caffeine.version>2.9.3</caffeine.version> <kafka.clients.version>3.0.0</kafka.clients.version> + <cos.sdk.version>5.6.54</cos.sdk.version> <paho.client.version>1.2.5</paho.client.version> <kubernetes.client.version>6.0.0</kubernetes.client.version>