This is an automated email from the ASF dual-hosted git repository.

shenlin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 0da281f5 [OSPP2024]feat: support aliyun OSS Sink Connector (#540)
0da281f5 is described below

commit 0da281f5b7c7a300ed1871312658a9d4c873e60c
Author: limbo <78575131+limbo...@users.noreply.github.com>
AuthorDate: Wed Oct 30 21:02:30 2024 +0800

    [OSPP2024]feat: support aliyun OSS Sink Connector (#540)
    
    feat: support rocketmq-connect-oss sink
---
 connectors/aliyun/rocketmq-connect-oss/README.md   |  48 ++++
 connectors/aliyun/rocketmq-connect-oss/pom.xml     | 218 ++++++++++++++++
 .../connect/oss/sink/OssSinkConnector.java         |  83 ++++++
 .../rocketmq/connect/oss/sink/OssSinkTask.java     | 281 +++++++++++++++++++++
 .../connect/oss/sink/constant/OssConstant.java     |  14 +
 .../connect/eventbridge/sink/OssSinkTest.java      | 156 ++++++++++++
 6 files changed, 800 insertions(+)

diff --git a/connectors/aliyun/rocketmq-connect-oss/README.md 
b/connectors/aliyun/rocketmq-connect-oss/README.md
new file mode 100644
index 00000000..16160840
--- /dev/null
+++ b/connectors/aliyun/rocketmq-connect-oss/README.md
@@ -0,0 +1,48 @@
+# rocketmq-connect-oss
+* **rocketmq-connect-oss** 说明
+```
+Be responsible for consuming messages from producer and writing data to oss.
+```
+
+## rocketmq-connect-oss 打包
+```
+mvn clean install -Dmaven.test.skip=true
+```
+
+## rocketmq-connect-oss 启动
+
+* **rocketmq-connect-oss** 启动
+
+```
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-oss-sink-connector-name}
+?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.oss.sink.OssSinkConnector","connect-topicname":"${connect-topicname}","accessKeyId":"${accessKeyId}","accessKeySecret":"${accessKeySecret}","accountEndpoint":"${accountEndpoint}","bucketName":"${bucketName}","fileUrlPrefix":"${fileUrlPrefix}","objectName":"${objectName}","region":"${region}","partitionMethod":"${partitionMethod}"}
+```
+
+例子 
+```
+http://localhost:8081/connectors/ossConnectorSink?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster";,
+"connector-class":"org.apache.rocketmq.connect.oss.sink.OssSinkConnector","connect-topicname":"oss-topic","accountEndpoint":"xxxx","accessKeyId":"xxxx","accessKeySecret":"xxxx",
+"bucketName":"xxxx","objectName":"xxxx","region":"xxxx","partitionMethod":"xxxx","fileUrlPrefix":"xxxx"}
+```
+
+>**注:** `rocketmq-oss-connect` 
的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的所有`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中
+
+## rocketmq-connect-oss 停止
+
+```
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-oss-connector-name}/stop
+```
+
+## rocketmq-connect-oss 参数说明
+* **oss-sink-connector 参数说明**
+
+| KEY             | TYPE   | Must be filled | Description                   | 
Example                    
+|-----------------|--------|----------------|-------------------------------|----------------------------|
+| accountEndpoint | String | YES            | OSS endpoint                  | 
oss-cn-beijing.aliyuncs.com |
+| accessKeyId     | String | YES            | 阿里云授信账号的AK                    | 
xxxx                       |
+| accessKeySecret | String | YES            | 阿里云授信账号的SK                    | 
xxx                        |
+| bucketName      | String | YES            | OSS bucketName                | 
test_bucket                |
+| objectName      | String | YES            | 上传目的object名字                  | 
test.txt                   |
+| region          | String | YES            | OSS region                    | 
cn-beijing                 |
+| partitionMethod | String | YES            | 分区模式,Normal表示不分区,Time表示按时间分区  | 
Time                       |
+| fileUrlPrefix   | String | YES            | 到object的URL前缀                 | 
file1/                     |
diff --git a/connectors/aliyun/rocketmq-connect-oss/pom.xml 
b/connectors/aliyun/rocketmq-connect-oss/pom.xml
new file mode 100644
index 00000000..9ac09327
--- /dev/null
+++ b/connectors/aliyun/rocketmq-connect-oss/pom.xml
@@ -0,0 +1,218 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.apache.rocketmq</groupId>
+    <artifactId>rocketmq-connect-oss</artifactId>
+    <version>1.0.0</version>
+
+    <name>connect-oss</name>
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+        <slf4j.version>1.7.36</slf4j.version>
+        <logback.version>1.2.11</logback.version>
+        <junit.version>4.13.2</junit.version>
+        <fastjson.version>1.2.83</fastjson.version>
+        <assertj.version>3.22.0</assertj.version>
+        <mockito.version>4.5.1</mockito.version>
+        
<openmessaging-connector.version>0.1.4</openmessaging-connector.version>
+        <commons-lang3.version>3.12.0</commons-lang3.version>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <aliyun-sdk-oss.version>3.17.2</aliyun-sdk-oss.version>
+        <aliyun-java-sdk-sts.version>3.1.0</aliyun-java-sdk-sts.version>
+        <aliyun-java-sdk-core.version>4.6.0</aliyun-java-sdk-core.version>
+        <gson.version>2.9.0</gson.version>
+    </properties>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>versions-maven-plugin</artifactId>
+                <version>2.3</version>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>clirr-maven-plugin</artifactId>
+                <version>2.7</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <configuration>
+                    
<outputDirectory>${project.build.directory}/lib</outputDirectory>
+                    <excludeTransitive>false</excludeTransitive>
+                    <stripVersion>true</stripVersion>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.6.1</version>
+                <configuration>
+                    <source>${maven.compiler.source}</source>
+                    <target>${maven.compiler.target}</target>
+                    <compilerVersion>${maven.compiler.source}</compilerVersion>
+                    <showDeprecation>true</showDeprecation>
+                    <showWarnings>true</showWarnings>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.19.1</version>
+                <configuration>
+                    <argLine>-Xms512m -Xmx1024m</argLine>
+                    <forkMode>always</forkMode>
+                    <includes>
+                        <include>**/*Test.java</include>
+                    </includes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-site-plugin</artifactId>
+                <version>3.6</version>
+                <configuration>
+                    <locales>en_US</locales>
+                    <outputEncoding>UTF-8</outputEncoding>
+                    <inputEncoding>UTF-8</inputEncoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-source-plugin</artifactId>
+                <version>3.0.1</version>
+                <executions>
+                    <execution>
+                        <id>attach-sources</id>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <version>2.10.4</version>
+                <configuration>
+                    <charset>UTF-8</charset>
+                    <locale>en_US</locale>
+                    
<excludePackageNames>io.openmessaging.internal</excludePackageNames>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>aggregate</id>
+                        <goals>
+                            <goal>aggregate</goal>
+                        </goals>
+                        <phase>site</phase>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-resources-plugin</artifactId>
+                <version>3.0.2</version>
+                <configuration>
+                    <encoding>${project.build.sourceEncoding}</encoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>findbugs-maven-plugin</artifactId>
+                <version>3.0.4</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>3.0.0</version>
+                <configuration>
+                    <archive>
+                        <manifest>
+                            
<mainClass>org.apache.rocketmq.connect.oss.sink.OssSinkConnector</mainClass>
+                        </manifest>
+                    </archive>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+    <dependencies>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>${slf4j.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <version>${logback.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+            <version>${logback.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <version>${assertj.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+            <version>${fastjson.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-connector</artifactId>
+            <version>${openmessaging-connector.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>${junit.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>${mockito.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>${commons-lang3.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.aliyun.oss</groupId>
+            <artifactId>aliyun-sdk-oss</artifactId>
+            <version>${aliyun-sdk-oss.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.code.gson</groupId>
+            <artifactId>gson</artifactId>
+            <version>${gson.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.aliyun</groupId>
+            <artifactId>aliyun-java-sdk-core</artifactId>
+            <version>${aliyun-java-sdk-core.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.aliyun</groupId>
+            <artifactId>aliyun-java-sdk-sts</artifactId>
+            <version>${aliyun-java-sdk-sts.version}</version>
+        </dependency>
+    </dependencies>
+
+</project>
diff --git 
a/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkConnector.java
 
b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkConnector.java
new file mode 100644
index 00000000..3c1d7387
--- /dev/null
+++ 
b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkConnector.java
@@ -0,0 +1,83 @@
+package org.apache.rocketmq.connect.oss.sink;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.sink.SinkConnector;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.connect.oss.sink.constant.OssConstant;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class OssSinkConnector extends SinkConnector {
+
+    private String accessKeyId;
+
+    private String accessKeySecret;
+
+    private String accountEndpoint;
+
+    private String bucketName;
+
+    private String fileUrlPrefix;
+
+    private String objectName;
+
+    private String region;
+
+    private String partitionMethod;
+
+    @Override
+    public List<KeyValue> taskConfigs(int maxTasks) {
+        List<KeyValue> keyValueList = new ArrayList<>();
+        for (int i = 0; i < maxTasks; ++i) {
+            KeyValue keyValue = new DefaultKeyValue();
+            keyValue.put(OssConstant.ACCESS_KEY_ID, accessKeyId);
+            keyValue.put(OssConstant.ACCESS_KEY_SECRET, accessKeySecret);
+            keyValue.put(OssConstant.ACCOUNT_ENDPOINT, accountEndpoint);
+            keyValue.put(OssConstant.BUCKET_NAME, bucketName);
+            keyValue.put(OssConstant.FILE_URL_PREFIX, fileUrlPrefix);
+            keyValue.put(OssConstant.OBJECT_NAME, objectName);
+            keyValue.put(OssConstant.REGION, region);
+            keyValue.put(OssConstant.PARTITION_METHOD, partitionMethod);
+            keyValueList.add(keyValue);
+        }
+        return keyValueList;
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return OssSinkTask.class;
+    }
+
+    @Override
+    public void validate(KeyValue config) {
+        if (StringUtils.isBlank(config.getString(OssConstant.ACCESS_KEY_ID))
+                || 
StringUtils.isBlank(config.getString(OssConstant.ACCESS_KEY_SECRET))
+                || 
StringUtils.isBlank(config.getString(OssConstant.ACCOUNT_ENDPOINT))
+                || 
StringUtils.isBlank(config.getString(OssConstant.BUCKET_NAME))
+                || 
StringUtils.isBlank(config.getString(OssConstant.OBJECT_NAME))
+                || StringUtils.isBlank(config.getString(OssConstant.REGION))
+                || 
StringUtils.isBlank(config.getString(OssConstant.PARTITION_METHOD))) {
+            throw new RuntimeException("Oss required parameter is null !");
+        }
+    }
+
+    @Override
+    public void start(KeyValue config) {
+        accessKeyId = config.getString(OssConstant.ACCESS_KEY_ID);
+        accessKeySecret = config.getString(OssConstant.ACCESS_KEY_SECRET);
+        accountEndpoint = config.getString(OssConstant.ACCOUNT_ENDPOINT);
+        bucketName = config.getString(OssConstant.BUCKET_NAME);
+        fileUrlPrefix = config.getString(OssConstant.FILE_URL_PREFIX);
+        objectName = config.getString(OssConstant.OBJECT_NAME);
+        region = config.getString(OssConstant.REGION);
+        partitionMethod = config.getString(OssConstant.PARTITION_METHOD);
+    }
+
+    @Override
+    public void stop() {
+
+    }
+}
diff --git 
a/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkTask.java
 
b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkTask.java
new file mode 100644
index 00000000..002bc30b
--- /dev/null
+++ 
b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkTask.java
@@ -0,0 +1,281 @@
+package org.apache.rocketmq.connect.oss.sink;
+
+
+import com.aliyuncs.DefaultAcsClient;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.aliyuncs.IAcsClient;
+import com.aliyuncs.http.FormatType;
+import com.aliyuncs.profile.DefaultProfile;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.Struct;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.errors.ConnectException;
+import io.openmessaging.connector.api.errors.RetriableException;
+import org.apache.rocketmq.connect.oss.sink.constant.OssConstant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.aliyun.oss.ClientException;
+import com.aliyun.oss.OSS;
+import com.aliyun.oss.OSSClientBuilder;
+import com.aliyun.oss.ClientBuilderConfiguration;
+import com.aliyun.oss.OSSException;
+import com.aliyun.oss.common.auth.CredentialsProviderFactory;
+import com.aliyun.oss.common.auth.DefaultCredentialProvider;
+import com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider;
+import com.aliyun.oss.common.auth.*;
+import com.aliyun.oss.common.comm.SignVersion;
+import com.aliyun.oss.model.AppendObjectRequest;
+import com.aliyun.oss.model.AppendObjectResult;
+import com.aliyun.oss.model.OSSObject;
+import com.aliyun.oss.model.PutObjectRequest;
+import com.aliyun.oss.model.PutObjectResult;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.HashMap; 
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Date;
+import java.text.SimpleDateFormat;
+import java.util.List;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.io.IOException;
+
+public class OssSinkTask extends SinkTask {
+    private static final Logger log = 
LoggerFactory.getLogger(OssSinkTask.class);
+
+    private String accessKeyId;
+
+    private String accessKeySecret;
+
+    private String accountEndpoint;
+
+    private String bucketName;
+
+    private String fileUrlPrefix;
+
+    private String region;
+
+    private OSS ossClient;
+
+    private String objectName;
+
+    private String partitionMethod;
+
+    private String compressType;
+
+    private String taskId;
+
+    private long lastOffset;
+
+    private long lastTimeStamp;
+
+    private String lastPrefix;
+
+    private HashMap<String, List<String>> recordMap = new HashMap<>();
+
+    private static final long OBJECT_SIZE_THRESHOLD = 200 * 1024 * 1024;  
+
+    private void processMap() throws ConnectException, IOException {
+        recordMap.forEach((key, values) -> {  
+            String joinedString = 
values.stream().collect(Collectors.joining("\n"));
+            String absolutePath = key + objectName;
+            boolean exists = ossClient.doesObjectExist(bucketName, 
absolutePath);
+            long offset = 0;
+            // If the object does not exist, create it and set offset to 0, 
otherwise read the offset of the current object
+            if (exists) {
+                try {
+                    OSSObject ossObject = ossClient.getObject(bucketName, 
absolutePath);
+                    InputStream inputStream = ossObject.getObjectContent();
+                    offset = inputStream.available();
+                    if (offset > OBJECT_SIZE_THRESHOLD) {
+                        SimpleDateFormat formatter = new 
SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+                        String sufix = formatter.format(new Date());
+                        ossClient.copyObject(bucketName, absolutePath, 
bucketName, absolutePath + sufix);
+                        ossClient.deleteObject(bucketName, absolutePath);
+                        ossClient.doesObjectExist(bucketName, absolutePath);
+                        offset = 0;
+                    }
+                } catch (Exception e) {
+                    log.error("OSSSinkTask | getObjectContent | error => ", e);
+                }
+            } else {
+                offset = 0;
+            }
+            putOss(absolutePath, offset, joinedString);
+        });
+    }
+
+    private String genFilePrefixByPartition(ConnectRecord record) throws 
ConnectException {
+        if (partitionMethod.equals("Normal")) {
+            return fileUrlPrefix;
+        } else if (partitionMethod.equals("Time")) {
+            long nowTimeStamp = record.getTimestamp();
+            if (lastTimeStamp != nowTimeStamp) {
+                Date myDate = new Date(nowTimeStamp);
+                String year = String.format("%tY", myDate);
+                String month = String.format("%tm", myDate);
+                String day = String.format("%td", myDate);
+                String hour = String.format("%tH", myDate);
+                lastPrefix = fileUrlPrefix + year + "/" + month + "/" + day + 
"/" + hour + "/";
+                return lastPrefix;
+            }
+            return lastPrefix;
+        } else {
+            throw new RetriableException("Illegal partition method.");
+        }
+    }
+
+    private long genObjectOffset(ConnectRecord record, String objectUrl) 
throws ConnectException, IOException {
+        if (partitionMethod.equals("Normal")) {
+            return lastOffset;
+        } else if (partitionMethod.equals("Time")) {
+            if (lastTimeStamp != record.getTimestamp()) {
+                boolean exists = ossClient.doesObjectExist(bucketName, 
objectUrl);
+                // If the object does not exist, create it and set offset to 
0, otherwise read the offset of the current object
+                if (exists) {
+                    OSSObject ossObject = ossClient.getObject(bucketName, 
objectUrl);
+                    InputStream inputStream = ossObject.getObjectContent();
+                    lastOffset = inputStream.available();
+                    return lastOffset;
+                } else {
+                    lastOffset = 0;
+                    return lastOffset;
+                }
+            } else {
+                return lastOffset;
+            }
+        } else {
+            throw new RetriableException("Illegal partition method.");
+        }
+    }
+
+    private void putOss(String absolutePath, long offset, String context) 
throws ConnectException {
+        try {
+            // Create an append write request and send it
+            AppendObjectRequest appendObjectRequest = new 
AppendObjectRequest(bucketName, absolutePath, new 
ByteArrayInputStream(context.getBytes()));
+            appendObjectRequest.setPosition(offset);
+            AppendObjectResult appendObjectResult = 
ossClient.appendObject(appendObjectRequest);
+
+            // Update
+            lastOffset = appendObjectResult.getNextPosition();
+        } catch (OSSException oe) {
+            log.error("Caught an OSSException, which means your request made 
it to OSS, but was rejected with an error response for some reason."
+            + " Error Message: {}, Error Code: {}, Request ID: {}, Host ID: 
{}.", oe.getErrorMessage(),oe.getErrorCode(), oe.getRequestId(), 
oe.getHostId());
+        } catch (ClientException ce) {
+            log.error("Caught an ClientException, which means the client 
encountered a serious internal problem while trying to communicate with OSS,"
+            + " such as not being able to access the network. Error Message: 
{}.", ce.getMessage());
+        }
+    }
+
+    private void handleRecord(ConnectRecord record) throws ConnectException, 
IOException {
+        JSONObject jsonObject = new JSONObject();
+        jsonObject.put("data", record.getData());
+        String context = JSON.toJSONString(jsonObject);
+        String prefix = genFilePrefixByPartition(record);
+
+        String absolutePath = prefix + objectName;
+        long appendOffset = genObjectOffset(record, absolutePath);
+        if (appendOffset > OBJECT_SIZE_THRESHOLD) {
+            SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss");
+            String sufix = formatter.format(new Date());
+            ossClient.copyObject(bucketName, absolutePath, bucketName, 
absolutePath + sufix);
+            ossClient.deleteObject(bucketName, absolutePath);
+            ossClient.doesObjectExist(bucketName, absolutePath);
+            lastOffset = 0;
+        }
+        putOss(absolutePath, appendOffset, context);
+        lastTimeStamp = record.getTimestamp();
+    }
+
+    @Override
+    public void put(List<ConnectRecord> sinkRecords) throws ConnectException {
+        try {
+            AtomicBoolean hasException = new AtomicBoolean(false); 
+            sinkRecords.forEach(sinkRecord -> {
+                try {
+                    handleRecord(sinkRecord);
+                } catch (OSSException oe) {
+                    log.error("Caught an OSSException, which means your 
request made it to OSS, but was rejected with an error response for some 
reason."
+                    + " Error Message: {}, Error Code: {}, Request ID: {}, 
Host ID: {}.", oe.getErrorMessage(),oe.getErrorCode(), oe.getRequestId(), 
oe.getHostId());
+                    hasException.set(true);
+                } catch (ClientException ce) {
+                    log.error("Caught an ClientException, which means the 
client encountered a serious internal problem while trying to communicate with 
OSS,"
+                    + " such as not being able to access the network. Error 
Message: {}.", ce.getMessage());
+                    hasException.set(true);
+                } catch (Exception e) {
+                    log.error("OSSSinkTask | genObjectOffset | error => ", e);
+                    hasException.set(true);
+                }
+            });
+            if (!hasException.get() && !recordMap.isEmpty()) {
+                processMap();
+            }
+        } catch (Exception e) {
+            log.error("OSSSinkTask | put | error => ", e);
+        }
+    }
+
+    @Override
+    public void validate(KeyValue config) {
+
+    }
+
+    @Override
+    public void init(SinkTaskContext sinkTaskContext) {
+        taskId = sinkTaskContext.getConnectorName() + "-" +  
sinkTaskContext.getTaskName();
+    }
+
+    @Override
+    public void start(KeyValue config) {
+        accessKeyId = config.getString(OssConstant.ACCESS_KEY_ID);
+        accessKeySecret = config.getString(OssConstant.ACCESS_KEY_SECRET);
+        accountEndpoint = config.getString(OssConstant.ACCOUNT_ENDPOINT);
+        bucketName = config.getString(OssConstant.BUCKET_NAME);
+        fileUrlPrefix = config.getString(OssConstant.FILE_URL_PREFIX);
+        objectName = config.getString(OssConstant.OBJECT_NAME);
+        region = config.getString(OssConstant.REGION);
+        partitionMethod = config.getString(OssConstant.PARTITION_METHOD);
+        compressType = config.getString(OssConstant.COMPRESS_TYPE);
+        
+        fileUrlPrefix = fileUrlPrefix + taskId + "/";
+        try {
+            DefaultCredentialProvider credentialsProvider = 
CredentialsProviderFactory.newDefaultCredentialProvider(accessKeyId, 
accessKeySecret);
+
+            ClientBuilderConfiguration clientBuilderConfiguration = new 
ClientBuilderConfiguration();
+            clientBuilderConfiguration.setSignatureVersion(SignVersion.V4);
+            ossClient = OSSClientBuilder.create()
+                    .endpoint(accountEndpoint)
+                    .credentialsProvider(credentialsProvider)
+                    .clientConfiguration(clientBuilderConfiguration)
+                    .region(region)
+                    .build();
+            if (partitionMethod.equals("Normal")) {
+                boolean exists = ossClient.doesObjectExist(bucketName, 
fileUrlPrefix + objectName);
+                // If the object does not exist, create it and set offset to 
0, otherwise read the offset of the current object
+                if (exists) {
+                    OSSObject ossObject = ossClient.getObject(bucketName, 
fileUrlPrefix + objectName);
+                    InputStream inputStream = ossObject.getObjectContent();
+                    long offset_now = inputStream.available();
+                    lastOffset = offset_now;
+                } else {
+                    lastOffset = 0;
+                }
+            }
+        } catch (Exception e) {
+            log.error("OssSinkTask | start | error => ", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void stop() {
+        ossClient.shutdown();
+    }
+
+}
diff --git 
a/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/constant/OssConstant.java
 
b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/constant/OssConstant.java
new file mode 100644
index 00000000..3c064876
--- /dev/null
+++ 
b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/constant/OssConstant.java
@@ -0,0 +1,14 @@
+package org.apache.rocketmq.connect.oss.sink.constant;
+
+public class OssConstant {
+
+    public static final String ACCESS_KEY_ID = "accessKeyId";
+    public static final String ACCESS_KEY_SECRET = "accessKeySecret";
+    public static final String ACCOUNT_ENDPOINT = "accountEndpoint";
+    public static final String BUCKET_NAME = "bucketName";
+    public static final String FILE_URL_PREFIX = "fileUrlPrefix";
+    public static final String OBJECT_NAME = "objectName";
+    public static final String REGION = "region";
+    public static final String PARTITION_METHOD = "partitionMethod";
+    public static final String COMPRESS_TYPE = "compressType";
+}
diff --git 
a/connectors/aliyun/rocketmq-connect-oss/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/OssSinkTest.java
 
b/connectors/aliyun/rocketmq-connect-oss/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/OssSinkTest.java
new file mode 100644
index 00000000..1e21169b
--- /dev/null
+++ 
b/connectors/aliyun/rocketmq-connect-oss/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/OssSinkTest.java
@@ -0,0 +1,156 @@
+package org.apache.rocketmq.connect.oss.sink;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.connect.oss.sink.constant.OssConstant;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+
+public class OssSinkTest {
+    @Test
+    public void testTaskConfigs() {
+        OssSinkConnector ossSinkConnector = new OssSinkConnector();
+        Assert.assertEquals(ossSinkConnector.taskConfigs(1).size(), 1);
+    }
+
+    @Test
+    public void testNormalPut() {
+        OssSinkTask ossSinkTask = new OssSinkTask();
+        KeyValue keyValue = new DefaultKeyValue();
+        // Replace KV pair with your own message
+        keyValue.put(OssConstant.ACCESS_KEY_ID, "xxx"); // Input yuor 
accesskey id
+        keyValue.put(OssConstant.ACCESS_KEY_SECRET, "xxx"); // Input your 
accesskey secret
+        keyValue.put(OssConstant.ACCOUNT_ENDPOINT, 
"oss-cn-beijing.aliyuncs.com");
+        keyValue.put(OssConstant.BUCKET_NAME, "rocketmqoss");
+        keyValue.put(OssConstant.FILE_URL_PREFIX, "test/");
+        keyValue.put(OssConstant.OBJECT_NAME, "oss_new.txt");
+        keyValue.put(OssConstant.REGION, "cn-beijing");
+        keyValue.put(OssConstant.PARTITION_METHOD, "Normal");
+
+        List<ConnectRecord> connectRecordList = new ArrayList<>();
+        ConnectRecord connectRecord = new ConnectRecord(null, null, 
System.currentTimeMillis());
+        connectRecord.setData("{\n" +
+                "\t\"test\" :  \"test\"\n" +
+                "}");
+        connectRecordList.add(connectRecord);
+        ossSinkTask.init(new SinkTaskContext() {
+            @Override
+            public String getConnectorName() {
+                return "test_connect";
+            }
+
+            @Override
+            public String getTaskName() {
+                return "test_task";
+            }
+
+            @Override public KeyValue configs() {
+                return null;
+            }
+
+            @Override
+            public void resetOffset(RecordPartition recordPartition, 
RecordOffset recordOffset) {
+
+            }
+
+            @Override
+            public void resetOffset(Map<RecordPartition, RecordOffset> map) {
+
+            }
+
+            @Override
+            public void pause(List<RecordPartition> list) {
+
+            }
+
+            @Override
+            public void resume(List<RecordPartition> list) {
+
+            }
+
+            @Override
+            public Set<RecordPartition> assignment() {
+                return null;
+            }
+        });
+        ossSinkTask.start(keyValue);
+        ossSinkTask.put(connectRecordList);
+    }
+
+    @Test
+    public void testTimePut() {
+        OssSinkTask ossSinkTask = new OssSinkTask();
+        KeyValue keyValue = new DefaultKeyValue();
+        // Replace KV pair with your own message
+        keyValue.put(OssConstant.ACCESS_KEY_ID, "xxx"); // Input yuor 
accesskey id
+        keyValue.put(OssConstant.ACCESS_KEY_SECRET, "xxx"); // Input your 
accesskey secret
+        keyValue.put(OssConstant.ACCOUNT_ENDPOINT, 
"oss-cn-beijing.aliyuncs.com");
+        keyValue.put(OssConstant.BUCKET_NAME, "rocketmqoss");
+        keyValue.put(OssConstant.FILE_URL_PREFIX, "test/");
+        keyValue.put(OssConstant.OBJECT_NAME, "oss_new.txt");
+        keyValue.put(OssConstant.REGION, "cn-beijing");
+        keyValue.put(OssConstant.PARTITION_METHOD, "Time");
+
+        List<ConnectRecord> connectRecordList = new ArrayList<>();
+        ConnectRecord connectRecord = new ConnectRecord(null, null, 
System.currentTimeMillis());
+        connectRecord.setData("{\n" +
+                "\t\"test\" :  \"test\"\n" +
+                "}");
+        connectRecordList.add(connectRecord);
+        ossSinkTask.init(new SinkTaskContext() {
+            @Override
+            public String getConnectorName() {
+                return "test_connect";
+            }
+
+            @Override
+            public String getTaskName() {
+                return "test_task";
+            }
+
+            @Override public KeyValue configs() {
+                return null;
+            }
+
+            @Override
+            public void resetOffset(RecordPartition recordPartition, 
RecordOffset recordOffset) {
+
+            }
+
+            @Override
+            public void resetOffset(Map<RecordPartition, RecordOffset> map) {
+
+            }
+
+            @Override
+            public void pause(List<RecordPartition> list) {
+
+            }
+
+            @Override
+            public void resume(List<RecordPartition> list) {
+
+            }
+
+            @Override
+            public Set<RecordPartition> assignment() {
+                return null;
+            }
+        });
+        ossSinkTask.start(keyValue);
+        ossSinkTask.put(connectRecordList);
+    }
+
+
+}


Reply via email to