ingdex commented on code in PR #540: URL: https://github.com/apache/rocketmq-connect/pull/540#discussion_r1769477033
########## connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkTask.java: ########## @@ -0,0 +1,278 @@ +package org.apache.rocketmq.connect.oss.sink; + + +import com.aliyuncs.DefaultAcsClient; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.aliyuncs.IAcsClient; +import com.aliyuncs.http.FormatType; +import com.aliyuncs.profile.DefaultProfile; +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.data.Struct; +import io.openmessaging.connector.api.component.task.sink.SinkTask; +import io.openmessaging.connector.api.component.task.sink.SinkTaskContext; +import io.openmessaging.connector.api.data.ConnectRecord; +import io.openmessaging.connector.api.errors.ConnectException; +import io.openmessaging.connector.api.errors.RetriableException; +import org.apache.rocketmq.connect.oss.sink.constant.OssConstant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.aliyun.oss.ClientException; +import com.aliyun.oss.OSS; +import com.aliyun.oss.OSSClientBuilder; +import com.aliyun.oss.ClientBuilderConfiguration; +import com.aliyun.oss.OSSException; +import com.aliyun.oss.common.auth.CredentialsProviderFactory; +import com.aliyun.oss.common.auth.DefaultCredentialProvider; +import com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider; +import com.aliyun.oss.common.auth.*; +import com.aliyun.oss.common.comm.SignVersion; +import com.aliyun.oss.model.AppendObjectRequest; +import com.aliyun.oss.model.AppendObjectResult; +import com.aliyun.oss.model.OSSObject; +import com.aliyun.oss.model.PutObjectRequest; +import com.aliyun.oss.model.PutObjectResult; + +import java.util.stream.Collectors; +import java.util.HashMap; +import java.net.URI; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.io.IOException; + +public class OssSinkTask extends SinkTask { + private static final Logger log = LoggerFactory.getLogger(OssSinkTask.class); + + private String accessKeyId; + + private String accessKeySecret; + + private String accountEndpoint; + + private String bucketName; + + private String fileUrlPrefix; + + private String region; + + private OSS ossClient; + + private String objectName; + + private String partitionMethod; + + private String compressType; + + private boolean enableBatchPut; + + private String taskId; + + private long lastOffset; + + private long lastTimeStamp; + + private String lastPrefix; + + private HashMap<String, List<String>> recordMap = new HashMap<>(); + + private void processMap() throws ConnectException, IOException { + recordMap.forEach((key, values) -> { + String joinedString = values.stream().collect(Collectors.joining("\n")); + String absolutePath = key + objectName; + boolean exists = ossClient.doesObjectExist(bucketName, absolutePath); + long offset = 0; + // If the object does not exist, create it and set offset to 0, otherwise read the offset of the current object + if (exists) { + try { + OSSObject ossObject = ossClient.getObject(bucketName, absolutePath); + InputStream inputStream = ossObject.getObjectContent(); + offset = inputStream.available(); Review Comment: Roll the file once its size exceeds the threshold. For example, when the file size becomes larger than 200 MB, create a new file for writing and archive the old ones. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org