This is an automated email from the ASF dual-hosted git repository. shenlin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push: new 0da281f5 [OSPP2024]feat: support aliyun OSS Sink Connector (#540) 0da281f5 is described below commit 0da281f5b7c7a300ed1871312658a9d4c873e60c Author: limbo <78575131+limbo...@users.noreply.github.com> AuthorDate: Wed Oct 30 21:02:30 2024 +0800 [OSPP2024]feat: support aliyun OSS Sink Connector (#540) feat: support rocketmq-connect-oss sink --- connectors/aliyun/rocketmq-connect-oss/README.md | 48 ++++ connectors/aliyun/rocketmq-connect-oss/pom.xml | 218 ++++++++++++++++ .../connect/oss/sink/OssSinkConnector.java | 83 ++++++ .../rocketmq/connect/oss/sink/OssSinkTask.java | 281 +++++++++++++++++++++ .../connect/oss/sink/constant/OssConstant.java | 14 + .../connect/eventbridge/sink/OssSinkTest.java | 156 ++++++++++++ 6 files changed, 800 insertions(+) diff --git a/connectors/aliyun/rocketmq-connect-oss/README.md b/connectors/aliyun/rocketmq-connect-oss/README.md new file mode 100644 index 00000000..16160840 --- /dev/null +++ b/connectors/aliyun/rocketmq-connect-oss/README.md @@ -0,0 +1,48 @@ +# rocketmq-connect-oss +* **rocketmq-connect-oss** 说明 +``` +Be responsible for consuming messages from producer and writing data to oss. +``` + +## rocketmq-connect-oss 打包 +``` +mvn clean install -Dmaven.test.skip=true +``` + +## rocketmq-connect-oss 启动 + +* **rocketmq-connect-oss** 启动 + +``` +http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-oss-sink-connector-name} +?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.oss.sink.OssSinkConnector","connect-topicname":"${connect-topicname}","accessKeyId":"${accessKeyId}","accessKeySecret":"${accessKeySecret}","accountEndpoint":"${accountEndpoint}","bucketName":"${bucketName}","fileUrlPrefix":"${fileUrlPrefix}","objectName":"${objectName}","region":"${region}","partitionMethod":"${partitionMethod}"} +``` + +例子 +``` +http://localhost:8081/connectors/ossConnectorSink?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster", +"connector-class":"org.apache.rocketmq.connect.oss.sink.OssSinkConnector","connect-topicname":"oss-topic","accountEndpoint":"xxxx","accessKeyId":"xxxx","accessKeySecret":"xxxx", +"bucketName":"xxxx","objectName":"xxxx","region":"xxxx","partitionMethod":"xxxx","fileUrlPrefix":"xxxx"} +``` + +>**注:** `rocketmq-oss-connect` 的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的所有`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中 + +## rocketmq-connect-oss 停止 + +``` +http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-oss-connector-name}/stop +``` + +## rocketmq-connect-oss 参数说明 +* **oss-sink-connector 参数说明** + +| KEY | TYPE | Must be filled | Description | Example +|-----------------|--------|----------------|-------------------------------|----------------------------| +| accountEndpoint | String | YES | OSS endpoint | oss-cn-beijing.aliyuncs.com | +| accessKeyId | String | YES | 阿里云授信账号的AK | xxxx | +| accessKeySecret | String | YES | 阿里云授信账号的SK | xxx | +| bucketName | String | YES | OSS bucketName | test_bucket | +| objectName | String | YES | 上传目的object名字 | test.txt | +| region | String | YES | OSS region | cn-beijing | +| partitionMethod | String | YES | 分区模式,Normal表示不分区,Time表示按时间分区 | Time | +| fileUrlPrefix | String | YES | 到object的URL前缀 | file1/ | diff --git a/connectors/aliyun/rocketmq-connect-oss/pom.xml b/connectors/aliyun/rocketmq-connect-oss/pom.xml new file mode 100644 index 00000000..9ac09327 --- /dev/null +++ b/connectors/aliyun/rocketmq-connect-oss/pom.xml @@ -0,0 +1,218 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-connect-oss</artifactId> + <version>1.0.0</version> + + <name>connect-oss</name> + <properties> + <maven.compiler.source>8</maven.compiler.source> + <maven.compiler.target>8</maven.compiler.target> + <slf4j.version>1.7.36</slf4j.version> + <logback.version>1.2.11</logback.version> + <junit.version>4.13.2</junit.version> + <fastjson.version>1.2.83</fastjson.version> + <assertj.version>3.22.0</assertj.version> + <mockito.version>4.5.1</mockito.version> + <openmessaging-connector.version>0.1.4</openmessaging-connector.version> + <commons-lang3.version>3.12.0</commons-lang3.version> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <aliyun-sdk-oss.version>3.17.2</aliyun-sdk-oss.version> + <aliyun-java-sdk-sts.version>3.1.0</aliyun-java-sdk-sts.version> + <aliyun-java-sdk-core.version>4.6.0</aliyun-java-sdk-core.version> + <gson.version>2.9.0</gson.version> + </properties> + + <build> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>versions-maven-plugin</artifactId> + <version>2.3</version> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>clirr-maven-plugin</artifactId> + <version>2.7</version> + </plugin> + <plugin> + <artifactId>maven-dependency-plugin</artifactId> + <configuration> + <outputDirectory>${project.build.directory}/lib</outputDirectory> + <excludeTransitive>false</excludeTransitive> + <stripVersion>true</stripVersion> + </configuration> + </plugin> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.6.1</version> + <configuration> + <source>${maven.compiler.source}</source> + <target>${maven.compiler.target}</target> + <compilerVersion>${maven.compiler.source}</compilerVersion> + <showDeprecation>true</showDeprecation> + <showWarnings>true</showWarnings> + </configuration> + </plugin> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.19.1</version> + <configuration> + <argLine>-Xms512m -Xmx1024m</argLine> + <forkMode>always</forkMode> + <includes> + <include>**/*Test.java</include> + </includes> + </configuration> + </plugin> + <plugin> + <artifactId>maven-site-plugin</artifactId> + <version>3.6</version> + <configuration> + <locales>en_US</locales> + <outputEncoding>UTF-8</outputEncoding> + <inputEncoding>UTF-8</inputEncoding> + </configuration> + </plugin> + <plugin> + <artifactId>maven-source-plugin</artifactId> + <version>3.0.1</version> + <executions> + <execution> + <id>attach-sources</id> + <goals> + <goal>jar</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-javadoc-plugin</artifactId> + <version>2.10.4</version> + <configuration> + <charset>UTF-8</charset> + <locale>en_US</locale> + <excludePackageNames>io.openmessaging.internal</excludePackageNames> + </configuration> + <executions> + <execution> + <id>aggregate</id> + <goals> + <goal>aggregate</goal> + </goals> + <phase>site</phase> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-resources-plugin</artifactId> + <version>3.0.2</version> + <configuration> + <encoding>${project.build.sourceEncoding}</encoding> + </configuration> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>findbugs-maven-plugin</artifactId> + <version>3.0.4</version> + </plugin> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <version>3.0.0</version> + <configuration> + <archive> + <manifest> + <mainClass>org.apache.rocketmq.connect.oss.sink.OssSinkConnector</mainClass> + </manifest> + </archive> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + </configuration> + <executions> + <execution> + <id>make-assembly</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + <dependencies> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>${slf4j.version}</version> + </dependency> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + <version>${logback.version}</version> + </dependency> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-core</artifactId> + <version>${logback.version}</version> + </dependency> + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + <version>${assertj.version}</version> + </dependency> + <dependency> + <groupId>com.alibaba</groupId> + <artifactId>fastjson</artifactId> + <version>${fastjson.version}</version> + </dependency> + <dependency> + <groupId>io.openmessaging</groupId> + <artifactId>openmessaging-connector</artifactId> + <version>${openmessaging-connector.version}</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>${junit.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <version>${mockito.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <version>${commons-lang3.version}</version> + </dependency> + <dependency> + <groupId>com.aliyun.oss</groupId> + <artifactId>aliyun-sdk-oss</artifactId> + <version>${aliyun-sdk-oss.version}</version> + </dependency> + <dependency> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + <version>${gson.version}</version> + </dependency> + <dependency> + <groupId>com.aliyun</groupId> + <artifactId>aliyun-java-sdk-core</artifactId> + <version>${aliyun-java-sdk-core.version}</version> + </dependency> + <dependency> + <groupId>com.aliyun</groupId> + <artifactId>aliyun-java-sdk-sts</artifactId> + <version>${aliyun-java-sdk-sts.version}</version> + </dependency> + </dependencies> + +</project> diff --git a/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkConnector.java b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkConnector.java new file mode 100644 index 00000000..3c1d7387 --- /dev/null +++ b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkConnector.java @@ -0,0 +1,83 @@ +package org.apache.rocketmq.connect.oss.sink; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.component.task.Task; +import io.openmessaging.connector.api.component.task.sink.SinkConnector; +import io.openmessaging.internal.DefaultKeyValue; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.connect.oss.sink.constant.OssConstant; + +import java.util.ArrayList; +import java.util.List; + +public class OssSinkConnector extends SinkConnector { + + private String accessKeyId; + + private String accessKeySecret; + + private String accountEndpoint; + + private String bucketName; + + private String fileUrlPrefix; + + private String objectName; + + private String region; + + private String partitionMethod; + + @Override + public List<KeyValue> taskConfigs(int maxTasks) { + List<KeyValue> keyValueList = new ArrayList<>(); + for (int i = 0; i < maxTasks; ++i) { + KeyValue keyValue = new DefaultKeyValue(); + keyValue.put(OssConstant.ACCESS_KEY_ID, accessKeyId); + keyValue.put(OssConstant.ACCESS_KEY_SECRET, accessKeySecret); + keyValue.put(OssConstant.ACCOUNT_ENDPOINT, accountEndpoint); + keyValue.put(OssConstant.BUCKET_NAME, bucketName); + keyValue.put(OssConstant.FILE_URL_PREFIX, fileUrlPrefix); + keyValue.put(OssConstant.OBJECT_NAME, objectName); + keyValue.put(OssConstant.REGION, region); + keyValue.put(OssConstant.PARTITION_METHOD, partitionMethod); + keyValueList.add(keyValue); + } + return keyValueList; + } + + @Override + public Class<? extends Task> taskClass() { + return OssSinkTask.class; + } + + @Override + public void validate(KeyValue config) { + if (StringUtils.isBlank(config.getString(OssConstant.ACCESS_KEY_ID)) + || StringUtils.isBlank(config.getString(OssConstant.ACCESS_KEY_SECRET)) + || StringUtils.isBlank(config.getString(OssConstant.ACCOUNT_ENDPOINT)) + || StringUtils.isBlank(config.getString(OssConstant.BUCKET_NAME)) + || StringUtils.isBlank(config.getString(OssConstant.OBJECT_NAME)) + || StringUtils.isBlank(config.getString(OssConstant.REGION)) + || StringUtils.isBlank(config.getString(OssConstant.PARTITION_METHOD))) { + throw new RuntimeException("Oss required parameter is null !"); + } + } + + @Override + public void start(KeyValue config) { + accessKeyId = config.getString(OssConstant.ACCESS_KEY_ID); + accessKeySecret = config.getString(OssConstant.ACCESS_KEY_SECRET); + accountEndpoint = config.getString(OssConstant.ACCOUNT_ENDPOINT); + bucketName = config.getString(OssConstant.BUCKET_NAME); + fileUrlPrefix = config.getString(OssConstant.FILE_URL_PREFIX); + objectName = config.getString(OssConstant.OBJECT_NAME); + region = config.getString(OssConstant.REGION); + partitionMethod = config.getString(OssConstant.PARTITION_METHOD); + } + + @Override + public void stop() { + + } +} diff --git a/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkTask.java b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkTask.java new file mode 100644 index 00000000..002bc30b --- /dev/null +++ b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkTask.java @@ -0,0 +1,281 @@ +package org.apache.rocketmq.connect.oss.sink; + + +import com.aliyuncs.DefaultAcsClient; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.aliyuncs.IAcsClient; +import com.aliyuncs.http.FormatType; +import com.aliyuncs.profile.DefaultProfile; +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.data.Struct; +import io.openmessaging.connector.api.component.task.sink.SinkTask; +import io.openmessaging.connector.api.component.task.sink.SinkTaskContext; +import io.openmessaging.connector.api.data.ConnectRecord; +import io.openmessaging.connector.api.errors.ConnectException; +import io.openmessaging.connector.api.errors.RetriableException; +import org.apache.rocketmq.connect.oss.sink.constant.OssConstant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.aliyun.oss.ClientException; +import com.aliyun.oss.OSS; +import com.aliyun.oss.OSSClientBuilder; +import com.aliyun.oss.ClientBuilderConfiguration; +import com.aliyun.oss.OSSException; +import com.aliyun.oss.common.auth.CredentialsProviderFactory; +import com.aliyun.oss.common.auth.DefaultCredentialProvider; +import com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider; +import com.aliyun.oss.common.auth.*; +import com.aliyun.oss.common.comm.SignVersion; +import com.aliyun.oss.model.AppendObjectRequest; +import com.aliyun.oss.model.AppendObjectResult; +import com.aliyun.oss.model.OSSObject; +import com.aliyun.oss.model.PutObjectRequest; +import com.aliyun.oss.model.PutObjectResult; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import java.util.HashMap; +import java.net.URI; +import java.util.ArrayList; +import java.util.Date; +import java.text.SimpleDateFormat; +import java.util.List; +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.io.IOException; + +public class OssSinkTask extends SinkTask { + private static final Logger log = LoggerFactory.getLogger(OssSinkTask.class); + + private String accessKeyId; + + private String accessKeySecret; + + private String accountEndpoint; + + private String bucketName; + + private String fileUrlPrefix; + + private String region; + + private OSS ossClient; + + private String objectName; + + private String partitionMethod; + + private String compressType; + + private String taskId; + + private long lastOffset; + + private long lastTimeStamp; + + private String lastPrefix; + + private HashMap<String, List<String>> recordMap = new HashMap<>(); + + private static final long OBJECT_SIZE_THRESHOLD = 200 * 1024 * 1024; + + private void processMap() throws ConnectException, IOException { + recordMap.forEach((key, values) -> { + String joinedString = values.stream().collect(Collectors.joining("\n")); + String absolutePath = key + objectName; + boolean exists = ossClient.doesObjectExist(bucketName, absolutePath); + long offset = 0; + // If the object does not exist, create it and set offset to 0, otherwise read the offset of the current object + if (exists) { + try { + OSSObject ossObject = ossClient.getObject(bucketName, absolutePath); + InputStream inputStream = ossObject.getObjectContent(); + offset = inputStream.available(); + if (offset > OBJECT_SIZE_THRESHOLD) { + SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + String sufix = formatter.format(new Date()); + ossClient.copyObject(bucketName, absolutePath, bucketName, absolutePath + sufix); + ossClient.deleteObject(bucketName, absolutePath); + ossClient.doesObjectExist(bucketName, absolutePath); + offset = 0; + } + } catch (Exception e) { + log.error("OSSSinkTask | getObjectContent | error => ", e); + } + } else { + offset = 0; + } + putOss(absolutePath, offset, joinedString); + }); + } + + private String genFilePrefixByPartition(ConnectRecord record) throws ConnectException { + if (partitionMethod.equals("Normal")) { + return fileUrlPrefix; + } else if (partitionMethod.equals("Time")) { + long nowTimeStamp = record.getTimestamp(); + if (lastTimeStamp != nowTimeStamp) { + Date myDate = new Date(nowTimeStamp); + String year = String.format("%tY", myDate); + String month = String.format("%tm", myDate); + String day = String.format("%td", myDate); + String hour = String.format("%tH", myDate); + lastPrefix = fileUrlPrefix + year + "/" + month + "/" + day + "/" + hour + "/"; + return lastPrefix; + } + return lastPrefix; + } else { + throw new RetriableException("Illegal partition method."); + } + } + + private long genObjectOffset(ConnectRecord record, String objectUrl) throws ConnectException, IOException { + if (partitionMethod.equals("Normal")) { + return lastOffset; + } else if (partitionMethod.equals("Time")) { + if (lastTimeStamp != record.getTimestamp()) { + boolean exists = ossClient.doesObjectExist(bucketName, objectUrl); + // If the object does not exist, create it and set offset to 0, otherwise read the offset of the current object + if (exists) { + OSSObject ossObject = ossClient.getObject(bucketName, objectUrl); + InputStream inputStream = ossObject.getObjectContent(); + lastOffset = inputStream.available(); + return lastOffset; + } else { + lastOffset = 0; + return lastOffset; + } + } else { + return lastOffset; + } + } else { + throw new RetriableException("Illegal partition method."); + } + } + + private void putOss(String absolutePath, long offset, String context) throws ConnectException { + try { + // Create an append write request and send it + AppendObjectRequest appendObjectRequest = new AppendObjectRequest(bucketName, absolutePath, new ByteArrayInputStream(context.getBytes())); + appendObjectRequest.setPosition(offset); + AppendObjectResult appendObjectResult = ossClient.appendObject(appendObjectRequest); + + // Update + lastOffset = appendObjectResult.getNextPosition(); + } catch (OSSException oe) { + log.error("Caught an OSSException, which means your request made it to OSS, but was rejected with an error response for some reason." + + " Error Message: {}, Error Code: {}, Request ID: {}, Host ID: {}.", oe.getErrorMessage(),oe.getErrorCode(), oe.getRequestId(), oe.getHostId()); + } catch (ClientException ce) { + log.error("Caught an ClientException, which means the client encountered a serious internal problem while trying to communicate with OSS," + + " such as not being able to access the network. Error Message: {}.", ce.getMessage()); + } + } + + private void handleRecord(ConnectRecord record) throws ConnectException, IOException { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("data", record.getData()); + String context = JSON.toJSONString(jsonObject); + String prefix = genFilePrefixByPartition(record); + + String absolutePath = prefix + objectName; + long appendOffset = genObjectOffset(record, absolutePath); + if (appendOffset > OBJECT_SIZE_THRESHOLD) { + SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + String sufix = formatter.format(new Date()); + ossClient.copyObject(bucketName, absolutePath, bucketName, absolutePath + sufix); + ossClient.deleteObject(bucketName, absolutePath); + ossClient.doesObjectExist(bucketName, absolutePath); + lastOffset = 0; + } + putOss(absolutePath, appendOffset, context); + lastTimeStamp = record.getTimestamp(); + } + + @Override + public void put(List<ConnectRecord> sinkRecords) throws ConnectException { + try { + AtomicBoolean hasException = new AtomicBoolean(false); + sinkRecords.forEach(sinkRecord -> { + try { + handleRecord(sinkRecord); + } catch (OSSException oe) { + log.error("Caught an OSSException, which means your request made it to OSS, but was rejected with an error response for some reason." + + " Error Message: {}, Error Code: {}, Request ID: {}, Host ID: {}.", oe.getErrorMessage(),oe.getErrorCode(), oe.getRequestId(), oe.getHostId()); + hasException.set(true); + } catch (ClientException ce) { + log.error("Caught an ClientException, which means the client encountered a serious internal problem while trying to communicate with OSS," + + " such as not being able to access the network. Error Message: {}.", ce.getMessage()); + hasException.set(true); + } catch (Exception e) { + log.error("OSSSinkTask | genObjectOffset | error => ", e); + hasException.set(true); + } + }); + if (!hasException.get() && !recordMap.isEmpty()) { + processMap(); + } + } catch (Exception e) { + log.error("OSSSinkTask | put | error => ", e); + } + } + + @Override + public void validate(KeyValue config) { + + } + + @Override + public void init(SinkTaskContext sinkTaskContext) { + taskId = sinkTaskContext.getConnectorName() + "-" + sinkTaskContext.getTaskName(); + } + + @Override + public void start(KeyValue config) { + accessKeyId = config.getString(OssConstant.ACCESS_KEY_ID); + accessKeySecret = config.getString(OssConstant.ACCESS_KEY_SECRET); + accountEndpoint = config.getString(OssConstant.ACCOUNT_ENDPOINT); + bucketName = config.getString(OssConstant.BUCKET_NAME); + fileUrlPrefix = config.getString(OssConstant.FILE_URL_PREFIX); + objectName = config.getString(OssConstant.OBJECT_NAME); + region = config.getString(OssConstant.REGION); + partitionMethod = config.getString(OssConstant.PARTITION_METHOD); + compressType = config.getString(OssConstant.COMPRESS_TYPE); + + fileUrlPrefix = fileUrlPrefix + taskId + "/"; + try { + DefaultCredentialProvider credentialsProvider = CredentialsProviderFactory.newDefaultCredentialProvider(accessKeyId, accessKeySecret); + + ClientBuilderConfiguration clientBuilderConfiguration = new ClientBuilderConfiguration(); + clientBuilderConfiguration.setSignatureVersion(SignVersion.V4); + ossClient = OSSClientBuilder.create() + .endpoint(accountEndpoint) + .credentialsProvider(credentialsProvider) + .clientConfiguration(clientBuilderConfiguration) + .region(region) + .build(); + if (partitionMethod.equals("Normal")) { + boolean exists = ossClient.doesObjectExist(bucketName, fileUrlPrefix + objectName); + // If the object does not exist, create it and set offset to 0, otherwise read the offset of the current object + if (exists) { + OSSObject ossObject = ossClient.getObject(bucketName, fileUrlPrefix + objectName); + InputStream inputStream = ossObject.getObjectContent(); + long offset_now = inputStream.available(); + lastOffset = offset_now; + } else { + lastOffset = 0; + } + } + } catch (Exception e) { + log.error("OssSinkTask | start | error => ", e); + throw new RuntimeException(e); + } + } + + @Override + public void stop() { + ossClient.shutdown(); + } + +} diff --git a/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/constant/OssConstant.java b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/constant/OssConstant.java new file mode 100644 index 00000000..3c064876 --- /dev/null +++ b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/constant/OssConstant.java @@ -0,0 +1,14 @@ +package org.apache.rocketmq.connect.oss.sink.constant; + +public class OssConstant { + + public static final String ACCESS_KEY_ID = "accessKeyId"; + public static final String ACCESS_KEY_SECRET = "accessKeySecret"; + public static final String ACCOUNT_ENDPOINT = "accountEndpoint"; + public static final String BUCKET_NAME = "bucketName"; + public static final String FILE_URL_PREFIX = "fileUrlPrefix"; + public static final String OBJECT_NAME = "objectName"; + public static final String REGION = "region"; + public static final String PARTITION_METHOD = "partitionMethod"; + public static final String COMPRESS_TYPE = "compressType"; +} diff --git a/connectors/aliyun/rocketmq-connect-oss/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/OssSinkTest.java b/connectors/aliyun/rocketmq-connect-oss/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/OssSinkTest.java new file mode 100644 index 00000000..1e21169b --- /dev/null +++ b/connectors/aliyun/rocketmq-connect-oss/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/OssSinkTest.java @@ -0,0 +1,156 @@ +package org.apache.rocketmq.connect.oss.sink; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.component.task.sink.SinkTaskContext; +import io.openmessaging.connector.api.data.ConnectRecord; +import io.openmessaging.connector.api.data.RecordOffset; +import io.openmessaging.connector.api.data.RecordPartition; +import io.openmessaging.internal.DefaultKeyValue; +import org.apache.rocketmq.connect.oss.sink.constant.OssConstant; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + + +public class OssSinkTest { + @Test + public void testTaskConfigs() { + OssSinkConnector ossSinkConnector = new OssSinkConnector(); + Assert.assertEquals(ossSinkConnector.taskConfigs(1).size(), 1); + } + + @Test + public void testNormalPut() { + OssSinkTask ossSinkTask = new OssSinkTask(); + KeyValue keyValue = new DefaultKeyValue(); + // Replace KV pair with your own message + keyValue.put(OssConstant.ACCESS_KEY_ID, "xxx"); // Input yuor accesskey id + keyValue.put(OssConstant.ACCESS_KEY_SECRET, "xxx"); // Input your accesskey secret + keyValue.put(OssConstant.ACCOUNT_ENDPOINT, "oss-cn-beijing.aliyuncs.com"); + keyValue.put(OssConstant.BUCKET_NAME, "rocketmqoss"); + keyValue.put(OssConstant.FILE_URL_PREFIX, "test/"); + keyValue.put(OssConstant.OBJECT_NAME, "oss_new.txt"); + keyValue.put(OssConstant.REGION, "cn-beijing"); + keyValue.put(OssConstant.PARTITION_METHOD, "Normal"); + + List<ConnectRecord> connectRecordList = new ArrayList<>(); + ConnectRecord connectRecord = new ConnectRecord(null, null, System.currentTimeMillis()); + connectRecord.setData("{\n" + + "\t\"test\" : \"test\"\n" + + "}"); + connectRecordList.add(connectRecord); + ossSinkTask.init(new SinkTaskContext() { + @Override + public String getConnectorName() { + return "test_connect"; + } + + @Override + public String getTaskName() { + return "test_task"; + } + + @Override public KeyValue configs() { + return null; + } + + @Override + public void resetOffset(RecordPartition recordPartition, RecordOffset recordOffset) { + + } + + @Override + public void resetOffset(Map<RecordPartition, RecordOffset> map) { + + } + + @Override + public void pause(List<RecordPartition> list) { + + } + + @Override + public void resume(List<RecordPartition> list) { + + } + + @Override + public Set<RecordPartition> assignment() { + return null; + } + }); + ossSinkTask.start(keyValue); + ossSinkTask.put(connectRecordList); + } + + @Test + public void testTimePut() { + OssSinkTask ossSinkTask = new OssSinkTask(); + KeyValue keyValue = new DefaultKeyValue(); + // Replace KV pair with your own message + keyValue.put(OssConstant.ACCESS_KEY_ID, "xxx"); // Input yuor accesskey id + keyValue.put(OssConstant.ACCESS_KEY_SECRET, "xxx"); // Input your accesskey secret + keyValue.put(OssConstant.ACCOUNT_ENDPOINT, "oss-cn-beijing.aliyuncs.com"); + keyValue.put(OssConstant.BUCKET_NAME, "rocketmqoss"); + keyValue.put(OssConstant.FILE_URL_PREFIX, "test/"); + keyValue.put(OssConstant.OBJECT_NAME, "oss_new.txt"); + keyValue.put(OssConstant.REGION, "cn-beijing"); + keyValue.put(OssConstant.PARTITION_METHOD, "Time"); + + List<ConnectRecord> connectRecordList = new ArrayList<>(); + ConnectRecord connectRecord = new ConnectRecord(null, null, System.currentTimeMillis()); + connectRecord.setData("{\n" + + "\t\"test\" : \"test\"\n" + + "}"); + connectRecordList.add(connectRecord); + ossSinkTask.init(new SinkTaskContext() { + @Override + public String getConnectorName() { + return "test_connect"; + } + + @Override + public String getTaskName() { + return "test_task"; + } + + @Override public KeyValue configs() { + return null; + } + + @Override + public void resetOffset(RecordPartition recordPartition, RecordOffset recordOffset) { + + } + + @Override + public void resetOffset(Map<RecordPartition, RecordOffset> map) { + + } + + @Override + public void pause(List<RecordPartition> list) { + + } + + @Override + public void resume(List<RecordPartition> list) { + + } + + @Override + public Set<RecordPartition> assignment() { + return null; + } + }); + ossSinkTask.start(keyValue); + ossSinkTask.put(connectRecordList); + } + + +}