ingdex commented on code in PR #540: URL: https://github.com/apache/rocketmq-connect/pull/540#discussion_r1769478174
########## connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkTask.java: ########## @@ -0,0 +1,278 @@ +package org.apache.rocketmq.connect.oss.sink; + + +import com.aliyuncs.DefaultAcsClient; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.aliyuncs.IAcsClient; +import com.aliyuncs.http.FormatType; +import com.aliyuncs.profile.DefaultProfile; +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.data.Struct; +import io.openmessaging.connector.api.component.task.sink.SinkTask; +import io.openmessaging.connector.api.component.task.sink.SinkTaskContext; +import io.openmessaging.connector.api.data.ConnectRecord; +import io.openmessaging.connector.api.errors.ConnectException; +import io.openmessaging.connector.api.errors.RetriableException; +import org.apache.rocketmq.connect.oss.sink.constant.OssConstant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.aliyun.oss.ClientException; +import com.aliyun.oss.OSS; +import com.aliyun.oss.OSSClientBuilder; +import com.aliyun.oss.ClientBuilderConfiguration; +import com.aliyun.oss.OSSException; +import com.aliyun.oss.common.auth.CredentialsProviderFactory; +import com.aliyun.oss.common.auth.DefaultCredentialProvider; +import com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider; +import com.aliyun.oss.common.auth.*; +import com.aliyun.oss.common.comm.SignVersion; +import com.aliyun.oss.model.AppendObjectRequest; +import com.aliyun.oss.model.AppendObjectResult; +import com.aliyun.oss.model.OSSObject; +import com.aliyun.oss.model.PutObjectRequest; +import com.aliyun.oss.model.PutObjectResult; + +import java.util.stream.Collectors; +import java.util.HashMap; +import java.net.URI; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.io.IOException; + +public class OssSinkTask extends SinkTask { + private static final Logger log = LoggerFactory.getLogger(OssSinkTask.class); + + private String accessKeyId; + + private String accessKeySecret; + + private String accountEndpoint; + + private String bucketName; + + private String fileUrlPrefix; + + private String region; + + private OSS ossClient; + + private String objectName; + + private String partitionMethod; + + private String compressType; + + private boolean enableBatchPut; + + private String taskId; + + private long lastOffset; + + private long lastTimeStamp; + + private String lastPrefix; + + private HashMap<String, List<String>> recordMap = new HashMap<>(); + + private void processMap() throws ConnectException, IOException { + recordMap.forEach((key, values) -> { + String joinedString = values.stream().collect(Collectors.joining("\n")); + String absolutePath = key + objectName; + boolean exists = ossClient.doesObjectExist(bucketName, absolutePath); + long offset = 0; + // If the object does not exist, create it and set offset to 0, otherwise read the offset of the current object + if (exists) { + try { + OSSObject ossObject = ossClient.getObject(bucketName, absolutePath); + InputStream inputStream = ossObject.getObjectContent(); + offset = inputStream.available(); + } catch (Exception e) { + log.error("OSSSinkTask | getObjectContent | error => ", e); + } + } else { + offset = 0; + } + putOss(absolutePath, offset, joinedString); + }); + } + + private String genFilePrefixByPartition(ConnectRecord record) throws ConnectException { + if (partitionMethod.equals("Normal")) { + return fileUrlPrefix; + } else if (partitionMethod.equals("Time")) { + long nowTimeStamp = record.getTimestamp(); + if (lastTimeStamp != nowTimeStamp) { + Date myDate = new Date(nowTimeStamp); + String year = String.format("%tY", myDate); + String month = String.format("%tm", myDate); + String day = String.format("%td", myDate); + String hour = String.format("%tH", myDate); + lastPrefix = fileUrlPrefix + year + "/" + month + "/" + day + "/" + hour + "/"; + return lastPrefix; + } + return lastPrefix; + } else { + throw new RetriableException("Illegal partition method."); + } + } + + private long genObjectOffset(ConnectRecord record, String objectUrl) throws ConnectException, IOException { + if (partitionMethod.equals("Normal")) { + return lastOffset; + } else if (partitionMethod.equals("Time")) { + if (lastTimeStamp != record.getTimestamp()) { + boolean exists = ossClient.doesObjectExist(bucketName, objectUrl); + // If the object does not exist, create it and set offset to 0, otherwise read the offset of the current object + if (exists) { + OSSObject ossObject = ossClient.getObject(bucketName, objectUrl); + InputStream inputStream = ossObject.getObjectContent(); + lastOffset = inputStream.available(); + return lastOffset; + } else { + lastOffset = 0; + return lastOffset; + } + } else { + return lastOffset; + } + } else { + throw new RetriableException("Illegal partition method."); + } + } + + private void putOss(String absolutePath, long offset, String context) throws ConnectException { + try { + // Create an append write request and send it + AppendObjectRequest appendObjectRequest = new AppendObjectRequest(bucketName, absolutePath, new ByteArrayInputStream(context.getBytes())); + appendObjectRequest.setPosition(offset); + AppendObjectResult appendObjectResult = ossClient.appendObject(appendObjectRequest); + + // Update + lastOffset = appendObjectResult.getNextPosition(); + } catch (OSSException oe) { + System.out.println("Caught an OSSException, which means your request made it to OSS, " + + "but was rejected with an error response for some reason."); + System.out.println("Error Message:" + oe.getErrorMessage()); + System.out.println("Error Code:" + oe.getErrorCode()); + System.out.println("Request ID:" + oe.getRequestId()); + System.out.println("Host ID:" + oe.getHostId()); + } catch (ClientException ce) { + System.out.println("Caught an ClientException, which means the client encountered " + + "a serious internal problem while trying to communicate with OSS, " + + "such as not being able to access the network."); + System.out.println("Error Message:" + ce.getMessage()); + } + } + + private void handleRecord(ConnectRecord record) throws ConnectException, IOException { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("data", record.getData()); + String context = JSON.toJSONString(jsonObject); + String prefix = genFilePrefixByPartition(record); + if (enableBatchPut) { + if (!recordMap.containsKey(prefix)) { + recordMap.put(prefix, new ArrayList<>()); + } + recordMap.get(prefix).add(context); + } else { + String absolutePath = prefix + objectName; + long appendOffset = genObjectOffset(record, absolutePath); + putOss(absolutePath, appendOffset, context); + lastTimeStamp = record.getTimestamp(); + } + } + + @Override + public void put(List<ConnectRecord> sinkRecords) throws ConnectException { + try { + sinkRecords.forEach(sinkRecord -> { + try { + handleRecord(sinkRecord); + } catch (OSSException oe) { + System.out.println("Caught an OSSException, which means your request made it to OSS, " + + "but was rejected with an error response for some reason."); + System.out.println("Error Message:" + oe.getErrorMessage()); + System.out.println("Error Code:" + oe.getErrorCode()); + System.out.println("Request ID:" + oe.getRequestId()); + System.out.println("Host ID:" + oe.getHostId()); + } catch (ClientException ce) { + System.out.println("Caught an ClientException, which means the client encountered " + + "a serious internal problem while trying to communicate with OSS, " + + "such as not being able to access the network."); + System.out.println("Error Message:" + ce.getMessage()); + } catch (Exception e) { + log.error("OSSSinkTask | genObjectOffset | error => ", e); + } + }); + if (enableBatchPut && !recordMap.isEmpty()) { Review Comment: When any exception happens above, I think processMap() should not be called. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org