limbo-24 commented on code in PR #540:
URL: https://github.com/apache/rocketmq-connect/pull/540#discussion_r1769608870


##########
connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkTask.java:
##########
@@ -0,0 +1,278 @@
+package org.apache.rocketmq.connect.oss.sink;
+
+
+import com.aliyuncs.DefaultAcsClient;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.aliyuncs.IAcsClient;
+import com.aliyuncs.http.FormatType;
+import com.aliyuncs.profile.DefaultProfile;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.Struct;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.errors.ConnectException;
+import io.openmessaging.connector.api.errors.RetriableException;
+import org.apache.rocketmq.connect.oss.sink.constant.OssConstant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.aliyun.oss.ClientException;
+import com.aliyun.oss.OSS;
+import com.aliyun.oss.OSSClientBuilder;
+import com.aliyun.oss.ClientBuilderConfiguration;
+import com.aliyun.oss.OSSException;
+import com.aliyun.oss.common.auth.CredentialsProviderFactory;
+import com.aliyun.oss.common.auth.DefaultCredentialProvider;
+import com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider;
+import com.aliyun.oss.common.auth.*;
+import com.aliyun.oss.common.comm.SignVersion;
+import com.aliyun.oss.model.AppendObjectRequest;
+import com.aliyun.oss.model.AppendObjectResult;
+import com.aliyun.oss.model.OSSObject;
+import com.aliyun.oss.model.PutObjectRequest;
+import com.aliyun.oss.model.PutObjectResult;
+
+import java.util.stream.Collectors;
+import java.util.HashMap; 
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.io.IOException;
+
+public class OssSinkTask extends SinkTask {
+    private static final Logger log = 
LoggerFactory.getLogger(OssSinkTask.class);
+
+    private String accessKeyId;
+
+    private String accessKeySecret;
+
+    private String accountEndpoint;
+
+    private String bucketName;
+
+    private String fileUrlPrefix;
+
+    private String region;
+
+    private OSS ossClient;
+
+    private String objectName;
+
+    private String partitionMethod;
+
+    private String compressType;
+
+    private boolean enableBatchPut;
+
+    private String taskId;
+
+    private long lastOffset;
+
+    private long lastTimeStamp;
+
+    private String lastPrefix;
+
+    private HashMap<String, List<String>> recordMap = new HashMap<>();
+
+    private void processMap() throws ConnectException, IOException {
+        recordMap.forEach((key, values) -> {  
+            String joinedString = 
values.stream().collect(Collectors.joining("\n"));
+            String absolutePath = key + objectName;
+            boolean exists = ossClient.doesObjectExist(bucketName, 
absolutePath);
+            long offset = 0;
+            // If the object does not exist, create it and set offset to 0, 
otherwise read the offset of the current object
+            if (exists) {
+                try {
+                    OSSObject ossObject = ossClient.getObject(bucketName, 
absolutePath);
+                    InputStream inputStream = ossObject.getObjectContent();
+                    offset = inputStream.available();
+                } catch (Exception e) {
+                    log.error("OSSSinkTask | getObjectContent | error => ", e);
+                }
+            } else {
+                offset = 0;
+            }
+            putOss(absolutePath, offset, joinedString);
+        });
+    }
+
+    private String genFilePrefixByPartition(ConnectRecord record) throws 
ConnectException {
+        if (partitionMethod.equals("Normal")) {
+            return fileUrlPrefix;
+        } else if (partitionMethod.equals("Time")) {
+            long nowTimeStamp = record.getTimestamp();
+            if (lastTimeStamp != nowTimeStamp) {
+                Date myDate = new Date(nowTimeStamp);
+                String year = String.format("%tY", myDate);
+                String month = String.format("%tm", myDate);
+                String day = String.format("%td", myDate);
+                String hour = String.format("%tH", myDate);
+                lastPrefix = fileUrlPrefix + year + "/" + month + "/" + day + 
"/" + hour + "/";
+                return lastPrefix;
+            }
+            return lastPrefix;
+        } else {
+            throw new RetriableException("Illegal partition method.");
+        }
+    }
+
+    private long genObjectOffset(ConnectRecord record, String objectUrl) 
throws ConnectException, IOException {
+        if (partitionMethod.equals("Normal")) {
+            return lastOffset;
+        } else if (partitionMethod.equals("Time")) {
+            if (lastTimeStamp != record.getTimestamp()) {
+                boolean exists = ossClient.doesObjectExist(bucketName, 
objectUrl);
+                // If the object does not exist, create it and set offset to 
0, otherwise read the offset of the current object
+                if (exists) {
+                    OSSObject ossObject = ossClient.getObject(bucketName, 
objectUrl);
+                    InputStream inputStream = ossObject.getObjectContent();
+                    lastOffset = inputStream.available();
+                    return lastOffset;
+                } else {
+                    lastOffset = 0;
+                    return lastOffset;
+                }
+            } else {
+                return lastOffset;
+            }
+        } else {
+            throw new RetriableException("Illegal partition method.");
+        }
+    }
+
+    private void putOss(String absolutePath, long offset, String context) 
throws ConnectException {
+        try {
+            // Create an append write request and send it
+            AppendObjectRequest appendObjectRequest = new 
AppendObjectRequest(bucketName, absolutePath, new 
ByteArrayInputStream(context.getBytes()));
+            appendObjectRequest.setPosition(offset);
+            AppendObjectResult appendObjectResult = 
ossClient.appendObject(appendObjectRequest);
+
+            // Update
+            lastOffset = appendObjectResult.getNextPosition();
+        } catch (OSSException oe) {
+            System.out.println("Caught an OSSException, which means your 
request made it to OSS, "
+                + "but was rejected with an error response for some reason.");
+            System.out.println("Error Message:" + oe.getErrorMessage());
+            System.out.println("Error Code:" + oe.getErrorCode());
+            System.out.println("Request ID:" + oe.getRequestId());
+            System.out.println("Host ID:" + oe.getHostId());
+        } catch (ClientException ce) {
+            System.out.println("Caught an ClientException, which means the 
client encountered "
+                    + "a serious internal problem while trying to communicate 
with OSS, "
+                    + "such as not being able to access the network.");
+            System.out.println("Error Message:" + ce.getMessage());
+        }
+    }
+
+    private void handleRecord(ConnectRecord record) throws ConnectException, 
IOException {
+        JSONObject jsonObject = new JSONObject();
+        jsonObject.put("data", record.getData());
+        String context = JSON.toJSONString(jsonObject);
+        String prefix = genFilePrefixByPartition(record);
+        if (enableBatchPut) {
+            if (!recordMap.containsKey(prefix)) {
+                recordMap.put(prefix, new ArrayList<>()); 
+            }
+            recordMap.get(prefix).add(context);
+        } else {
+            String absolutePath = prefix + objectName;
+            long appendOffset = genObjectOffset(record, absolutePath);
+            putOss(absolutePath, appendOffset, context);
+            lastTimeStamp = record.getTimestamp();
+        }
+    }
+
+    @Override
+    public void put(List<ConnectRecord> sinkRecords) throws ConnectException {
+        try {
+            sinkRecords.forEach(sinkRecord -> {
+                try {
+                    handleRecord(sinkRecord);
+                } catch (OSSException oe) {
+                    System.out.println("Caught an OSSException, which means 
your request made it to OSS, "
+                            + "but was rejected with an error response for 
some reason.");
+                    System.out.println("Error Message:" + 
oe.getErrorMessage());
+                    System.out.println("Error Code:" + oe.getErrorCode());
+                    System.out.println("Request ID:" + oe.getRequestId());
+                    System.out.println("Host ID:" + oe.getHostId());
+                } catch (ClientException ce) {
+                    System.out.println("Caught an ClientException, which means 
the client encountered "
+                            + "a serious internal problem while trying to 
communicate with OSS, "
+                            + "such as not being able to access the network.");
+                    System.out.println("Error Message:" + ce.getMessage());
+                } catch (Exception e) {
+                    log.error("OSSSinkTask | genObjectOffset | error => ", e);
+                }
+            });
+            if (enableBatchPut && !recordMap.isEmpty()) {  

Review Comment:
   Added a boolean variable `hasException` to ensure `processMap` is called 
only if all records are processed successfully.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to