limbo-24 commented on code in PR #540:
URL: https://github.com/apache/rocketmq-connect/pull/540#discussion_r1769608786


##########
connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkTask.java:
##########
@@ -0,0 +1,278 @@
+package org.apache.rocketmq.connect.oss.sink;
+
+
+import com.aliyuncs.DefaultAcsClient;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.aliyuncs.IAcsClient;
+import com.aliyuncs.http.FormatType;
+import com.aliyuncs.profile.DefaultProfile;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.Struct;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.errors.ConnectException;
+import io.openmessaging.connector.api.errors.RetriableException;
+import org.apache.rocketmq.connect.oss.sink.constant.OssConstant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.aliyun.oss.ClientException;
+import com.aliyun.oss.OSS;
+import com.aliyun.oss.OSSClientBuilder;
+import com.aliyun.oss.ClientBuilderConfiguration;
+import com.aliyun.oss.OSSException;
+import com.aliyun.oss.common.auth.CredentialsProviderFactory;
+import com.aliyun.oss.common.auth.DefaultCredentialProvider;
+import com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider;
+import com.aliyun.oss.common.auth.*;
+import com.aliyun.oss.common.comm.SignVersion;
+import com.aliyun.oss.model.AppendObjectRequest;
+import com.aliyun.oss.model.AppendObjectResult;
+import com.aliyun.oss.model.OSSObject;
+import com.aliyun.oss.model.PutObjectRequest;
+import com.aliyun.oss.model.PutObjectResult;
+
+import java.util.stream.Collectors;
+import java.util.HashMap; 
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.io.IOException;
+
+public class OssSinkTask extends SinkTask {
+    private static final Logger log = 
LoggerFactory.getLogger(OssSinkTask.class);
+
+    private String accessKeyId;
+
+    private String accessKeySecret;
+
+    private String accountEndpoint;
+
+    private String bucketName;
+
+    private String fileUrlPrefix;
+
+    private String region;
+
+    private OSS ossClient;
+
+    private String objectName;
+
+    private String partitionMethod;
+
+    private String compressType;
+
+    private boolean enableBatchPut;
+
+    private String taskId;
+
+    private long lastOffset;
+
+    private long lastTimeStamp;
+
+    private String lastPrefix;
+
+    private HashMap<String, List<String>> recordMap = new HashMap<>();
+
+    private void processMap() throws ConnectException, IOException {
+        recordMap.forEach((key, values) -> {  
+            String joinedString = 
values.stream().collect(Collectors.joining("\n"));
+            String absolutePath = key + objectName;
+            boolean exists = ossClient.doesObjectExist(bucketName, 
absolutePath);
+            long offset = 0;
+            // If the object does not exist, create it and set offset to 0, 
otherwise read the offset of the current object
+            if (exists) {
+                try {
+                    OSSObject ossObject = ossClient.getObject(bucketName, 
absolutePath);
+                    InputStream inputStream = ossObject.getObjectContent();
+                    offset = inputStream.available();

Review Comment:
   Before writing to OSS, check the offset size. If it exceeds 
OBJECT_SIZE_THRESHOLD, append the current timestamp as a suffix to the 
specified file name. For example: `test.txt -> test.txt_2024-09-21-22:33:00`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to