This is an automated email from the ASF dual-hosted git repository. yukon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push: new d122bbd [ISSUE #26] Add RocketMQ Source Connect and RocketMQ Sink Connect (#27) d122bbd is described below commit d122bbdea910267967a5f103ad5255dbb7e7120e Author: zhaohai <33314633+zhaohai1299002...@users.noreply.github.com> AuthorDate: Fri Apr 15 13:57:13 2022 +0800 [ISSUE #26] Add RocketMQ Source Connect and RocketMQ Sink Connect (#27) * add RocketMQ Source Connect and RocketMQ Sink Connect * update pom Co-authored-by: zh378814 <wb-zh378...@alibaba-inc.com> --- .../aliyun/rocketmq-connect-rocketmq/README.md | 71 ++++++++ .../aliyun/rocketmq-connect-rocketmq/pom.xml | 194 +++++++++++++++++++++ .../connect/rocketmq/RocketMQSinkConnector.java | 77 ++++++++ .../connect/rocketmq/RocketMQSinkTask.java | 136 +++++++++++++++ .../connect/rocketmq/RocketMQSourceConnector.java | 81 +++++++++ .../connect/rocketmq/RocketMQSourceTask.java | 181 +++++++++++++++++++ .../connect/rocketmq/common/RocketMQConstant.java | 17 ++ .../rocketmq/connect/rocketmq/utils/OnsUtils.java | 9 + .../rocketmq/RocketMQSinkConnectorTest.java | 95 ++++++++++ .../rocketmq/RocketMQSourceConnectorTest.java | 64 +++++++ 10 files changed, 925 insertions(+) diff --git a/connectors/aliyun/rocketmq-connect-rocketmq/README.md b/connectors/aliyun/rocketmq-connect-rocketmq/README.md new file mode 100644 index 0000000..cb41006 --- /dev/null +++ b/connectors/aliyun/rocketmq-connect-rocketmq/README.md @@ -0,0 +1,71 @@ +# rocketmq-connect-rocketmq + +## rocketmq-connect-rocketmq 打包 +``` +mvn clean install -Dmaven.test.skip=true +``` + +## rocketmq-connect-rocketmq 启动 + +* **rocketmq-source-connector** 启动 + +``` +http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-rocketmq-source-connector-name} +?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.rocketmq.RocketMQSourceConnector",“accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",namesrvAddr”:"${namesrvAddr}","topic":"${topic}","instanceId":"${instanceId}","consumerGroup":"${consumerGroup}"} +``` + +例子 + +``` +http://localhost:8081/connectors/rocketmqConnectorSource?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster", +"connector-class":"org.apache.rocketmq.connect.rocketmq.RocketMQSourceConnector","accessKeyId":"xxxx","accessKeySecret":"xxxx","namesrvAddr":"http://127.0.0.1:9876","topic":"topic", +"instanceId":"xxxx", "consumerGroup":"xxxx"} +``` + +* **rocketmq-sink-connector** 启动 + +``` +http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-rocketmq-sink-connector-name} +?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.rocketmq.RocketMQSinkConnector",“accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",namesrvAddr”:"${namesrvAddr}","topic":"${topic}","instanceId":"${instanceId}"} +``` + +例子 +``` +http://localhost:8081/connectors/rocketmqConnectorSink?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster", +"connector-class":"org.apache.rocketmq.connect.rocketmq.RocketMQSinkConnector","accessKeyId":"xxxx","accessKeySecret":"xxxx","namesrvAddr":"http://127.0.0.1:9876","topic":"topic", +"instanceId":"xxxx"} +``` + +>**注:** `rocketmq-rocketmq-connect` 的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的所有`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中 + +## rocketmq-connect-rocketmq 停止 + +``` +http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-rocketmq-connector-name}/stop +``` + +## rocketmq-connect-rocketmq 参数说明 +* **rocketmq-source-connector 参数说明** + +| KEY | TYPE | Must be filled | Description| Example +|------------------------|---------|----------------|------------|---| +| accessKeyId | String | YES | AccessKey ID阿里云身份验证,在阿里云服务器管理控制台创建 | xxxx | +| accessKeySecret | String | YES | AccessKey Secret阿里云身份验证,在阿里云服务器管理控制台创建 | xxxx | +| namesrvAddr | String | YES | 设置TCP接入域名,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看 | xxxx | +| topic | String | YES | 消息主题 | xxxx | +| instanceId | String | NO | 阿里云MQ控制台的实例Id | xxxx | +| consumerGroup | String | YES | 消息订阅者 | xxxx | + +``` +注:1. source/sink配置文件说明是以rocketmq-connect-rocketmq为demo,不同source/sink connector配置有差异,请以具体sourc/sink connector为准 +``` +* **rocketmq-sink-connector 参数说明** + +| KEY | TYPE | Must be filled | Description | Example +|-----------------------|---------|----------------|-----------|---------| +| accessKeyId | String | YES | AccessKey ID阿里云身份验证,在阿里云服务器管理控制台创建 | xxxx | +| accessKeySecret | String | YES | AccessKey Secret阿里云身份验证,在阿里云服务器管理控制台创建 | xxxx | +| namesrvAddr | String | YES | 设置TCP接入域名,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看 | xxxx | +| topic | String | YES | 消息主题 | xxxx | +| instanceId | String | NO | 阿里云MQ控制台的实例Id | xxxx | + diff --git a/connectors/aliyun/rocketmq-connect-rocketmq/pom.xml b/connectors/aliyun/rocketmq-connect-rocketmq/pom.xml new file mode 100644 index 0000000..f086c47 --- /dev/null +++ b/connectors/aliyun/rocketmq-connect-rocketmq/pom.xml @@ -0,0 +1,194 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-connect-rocketmq</artifactId> + <version>0.0.1-SNAPSHOT</version> + + <modelVersion>4.0.0</modelVersion> + + <name>connect-rocketmq</name> + <properties> + <maven.compiler.source>8</maven.compiler.source> + <maven.compiler.target>8</maven.compiler.target> + <slf4j.version>1.7.7</slf4j.version> + <logback.version>1.0.13</logback.version> + <openmessaging-connector.version>0.1.2-SNAPSHOT</openmessaging-connector.version> + <junit.version>4.12</junit.version> + <assertj.version>3.22.0</assertj.version> + <mockito.version>4.0.0</mockito.version> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <ons-client.version>1.8.8.3.Final</ons-client.version> + <ons20190214.version>1.0.0</ons20190214.version> + </properties> + + <build> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>versions-maven-plugin</artifactId> + <version>2.3</version> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>clirr-maven-plugin</artifactId> + <version>2.7</version> + </plugin> + <plugin> + <artifactId>maven-dependency-plugin</artifactId> + <configuration> + <outputDirectory>${project.build.directory}/lib</outputDirectory> + <excludeTransitive>false</excludeTransitive> + <stripVersion>true</stripVersion> + </configuration> + </plugin> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.6.1</version> + <configuration> + <source>${maven.compiler.source}</source> + <target>${maven.compiler.target}</target> + <compilerVersion>${maven.compiler.source}</compilerVersion> + <showDeprecation>true</showDeprecation> + <showWarnings>true</showWarnings> + </configuration> + </plugin> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.19.1</version> + <configuration> + <argLine>-Xms512m -Xmx1024m</argLine> + <forkMode>always</forkMode> + <includes> + <include>**/*Test.java</include> + </includes> + </configuration> + </plugin> + <plugin> + <artifactId>maven-site-plugin</artifactId> + <version>3.6</version> + <configuration> + <locales>en_US</locales> + <outputEncoding>UTF-8</outputEncoding> + <inputEncoding>UTF-8</inputEncoding> + </configuration> + </plugin> + <plugin> + <artifactId>maven-source-plugin</artifactId> + <version>3.0.1</version> + <executions> + <execution> + <id>attach-sources</id> + <goals> + <goal>jar</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-javadoc-plugin</artifactId> + <version>2.10.4</version> + <configuration> + <charset>UTF-8</charset> + <locale>en_US</locale> + <excludePackageNames>io.openmessaging.internal</excludePackageNames> + </configuration> + <executions> + <execution> + <id>aggregate</id> + <goals> + <goal>aggregate</goal> + </goals> + <phase>site</phase> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-resources-plugin</artifactId> + <version>3.0.2</version> + <configuration> + <encoding>${project.build.sourceEncoding}</encoding> + </configuration> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>findbugs-maven-plugin</artifactId> + <version>3.0.4</version> + </plugin> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <version>3.0.0</version> + <configuration> + <archive> + <manifest> + <mainClass>org.apache.rocketmq.connect.rocketmq.RocketMQSourceConnector</mainClass> + </manifest> + </archive> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + </configuration> + <executions> + <execution> + <id>make-assembly</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>com.aliyun</groupId> + <artifactId>ons20190214</artifactId> + <version>${ons20190214.version}</version> + </dependency> + <dependency> + <groupId>com.aliyun.openservices</groupId> + <artifactId>ons-client</artifactId> + <version>${ons-client.version}</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>${slf4j.version}</version> + </dependency> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + <version>${logback.version}</version> + </dependency> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-core</artifactId> + <version>${logback.version}</version> + </dependency> + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + <version>${assertj.version}</version> + </dependency> + <dependency> + <groupId>io.openmessaging</groupId> + <artifactId>openmessaging-connector</artifactId> + <version>${openmessaging-connector.version}</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>${junit.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <version>${mockito.version}</version> + <scope>test</scope> + </dependency> + </dependencies> +</project> \ No newline at end of file diff --git a/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkConnector.java b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkConnector.java new file mode 100644 index 0000000..790c6dd --- /dev/null +++ b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkConnector.java @@ -0,0 +1,77 @@ +package org.apache.rocketmq.connect.rocketmq; + + +import com.aliyun.openservices.shade.org.apache.commons.lang3.StringUtils; +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.component.task.Task; +import io.openmessaging.connector.api.component.task.sink.SinkConnector; +import io.openmessaging.internal.DefaultKeyValue; +import org.apache.rocketmq.connect.rocketmq.common.RocketMQConstant; + +import java.util.ArrayList; +import java.util.List; + +public class RocketMQSinkConnector extends SinkConnector { + + private String accessKeyId; + + private String accessKeySecret; + + private String namesrvAddr; + + private String topic; + + private String instanceId; + + @Override + public void pause() { + + } + + @Override + public void resume() { + + } + + @Override + public List<KeyValue> taskConfigs(int maxTasks) { + List<KeyValue> keyValues = new ArrayList<>(); + KeyValue keyValue = new DefaultKeyValue(); + keyValue.put(RocketMQConstant.ACCESS_KEY_ID, accessKeyId); + keyValue.put(RocketMQConstant.ACCESS_KEY_SECRET, accessKeySecret); + keyValue.put(RocketMQConstant.TOPIC, topic); + keyValue.put(RocketMQConstant.INSTANCE_ID, instanceId); + keyValue.put(RocketMQConstant.NAMESRV_ADDR, namesrvAddr); + keyValues.add(keyValue); + return keyValues; + } + + @Override + public Class<? extends Task> taskClass() { + return RocketMQSinkTask.class; + } + + @Override + public void validate(KeyValue config) { + if (StringUtils.isBlank(config.getString(RocketMQConstant.ACCESS_KEY_ID)) + || StringUtils.isBlank(config.getString(RocketMQConstant.ACCESS_KEY_SECRET)) + || StringUtils.isBlank(config.getString(RocketMQConstant.NAMESRV_ADDR)) + || StringUtils.isBlank(config.getString(RocketMQConstant.TOPIC))) { + throw new RuntimeException("rocketmq required parameter is null !"); + } + } + + @Override + public void init(KeyValue config) { + accessKeyId = config.getString(RocketMQConstant.ACCESS_KEY_ID); + accessKeySecret = config.getString(RocketMQConstant.ACCESS_KEY_SECRET); + namesrvAddr = config.getString(RocketMQConstant.NAMESRV_ADDR); + topic = config.getString(RocketMQConstant.TOPIC); + instanceId = config.getString(RocketMQConstant.INSTANCE_ID); + } + + @Override + public void stop() { + + } +} diff --git a/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkTask.java b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkTask.java new file mode 100644 index 0000000..6c8eb36 --- /dev/null +++ b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkTask.java @@ -0,0 +1,136 @@ +package org.apache.rocketmq.connect.rocketmq; + +import com.aliyun.ons20190214.Client; +import com.aliyun.ons20190214.models.OnsTopicListRequest; +import com.aliyun.ons20190214.models.OnsTopicListResponse; +import com.aliyun.openservices.ons.api.Message; +import com.aliyun.openservices.ons.api.ONSFactory; +import com.aliyun.openservices.ons.api.Producer; +import com.aliyun.openservices.ons.api.PropertyKeyConst; +import com.aliyun.openservices.shade.com.alibaba.fastjson.JSON; +import com.aliyun.openservices.shade.org.apache.commons.lang3.StringUtils; +import com.aliyun.teaopenapi.models.Config; +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.component.task.sink.SinkTask; +import io.openmessaging.connector.api.component.task.sink.SinkTaskContext; +import io.openmessaging.connector.api.data.ConnectRecord; +import io.openmessaging.connector.api.errors.ConnectException; +import org.apache.rocketmq.connect.rocketmq.common.RocketMQConstant; +import org.apache.rocketmq.connect.rocketmq.utils.OnsUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Properties; + +public class RocketMQSinkTask extends SinkTask { + private static final Logger log = LoggerFactory.getLogger(RocketMQSinkTask.class); + + private String accessKeyId; + + private String accessKeySecret; + + private String namesrvAddr; + + private String topic; + + private Producer producer; + + private String instanceId; + + @Override + public void put(List<ConnectRecord> sinkRecords) throws ConnectException { + try { + sinkRecords.forEach(connectRecord -> { + Message message = new Message(); + message.setBody(JSON.toJSONString(connectRecord.getData()).getBytes(StandardCharsets.UTF_8)); + // TODO message.setKey(); + // TODO message.setTag(); + final KeyValue extensions = connectRecord.getExtensions(); + if (extensions != null) { + extensions.keySet().forEach(key -> message.putUserProperties(key, extensions.getString(key))); + } + message.setTopic(topic); + producer.send(message); + }); + } catch (Exception e) { + log.error("RocketMQSinkTask | put | error => ", e); + throw new ConnectException(e); + } + } + + @Override + public void pause() { + + } + + @Override + public void resume() { + + } + + @Override + public void validate(KeyValue config) { + if (StringUtils.isBlank(config.getString(RocketMQConstant.ACCESS_KEY_ID)) + || StringUtils.isBlank(config.getString(RocketMQConstant.ACCESS_KEY_SECRET)) + || StringUtils.isBlank(config.getString(RocketMQConstant.NAMESRV_ADDR)) + || StringUtils.isBlank(config.getString(RocketMQConstant.TOPIC))) { + throw new RuntimeException("rocketmq required parameter is null !"); + } + // 检查topic是否存在 + try { + Config onsConfig = new Config() + .setAccessKeyId(config.getString(RocketMQConstant.ACCESS_KEY_ID)) + .setAccessKeySecret(config.getString(RocketMQConstant.ACCESS_KEY_SECRET)); + onsConfig.endpoint = OnsUtils.parseEndpoint(config.getString(RocketMQConstant.NAMESRV_ADDR)); + final Client client = new Client(onsConfig); + OnsTopicListRequest onsTopicListRequest = new OnsTopicListRequest() + .setTopic(config.getString(RocketMQConstant.TOPIC)) + .setInstanceId(config.getString(RocketMQConstant.INSTANCE_ID)); + final OnsTopicListResponse onsTopicListResponse = client.onsTopicList(onsTopicListRequest); + if (onsTopicListResponse.getBody().getData().getPublishInfoDo().isEmpty()) { + throw new RuntimeException("rocketmq required parameter topic does not exist !"); + } + } catch (Exception e) { + log.error("RocketMQSinkTask | validate | error => ", e); + throw new RuntimeException(e.getMessage()); + } + } + + @Override + public void init(KeyValue config) { + accessKeyId = config.getString(RocketMQConstant.ACCESS_KEY_ID); + accessKeySecret = config.getString(RocketMQConstant.ACCESS_KEY_SECRET); + namesrvAddr = config.getString(RocketMQConstant.NAMESRV_ADDR); + topic = config.getString(RocketMQConstant.TOPIC); + instanceId = config.getString(RocketMQConstant.INSTANCE_ID); + } + + @Override + public void start(SinkTaskContext sinkTaskContext) { + try { + super.start(sinkTaskContext); + if (producer != null) { + producer.shutdown(); + } + Properties properties = new Properties(); + properties.put(PropertyKeyConst.AccessKey, accessKeyId); + properties.put(PropertyKeyConst.SecretKey, accessKeySecret); + if (StringUtils.isNotBlank(instanceId)) { + properties.put(PropertyKeyConst.INSTANCE_ID, instanceId); + } + properties.put(PropertyKeyConst.NAMESRV_ADDR, namesrvAddr); + producer = ONSFactory.createProducer(properties); + producer.start(); + } catch (Exception e) { + log.error("RocketMQSinkTask | start | error =>", e); + throw new RuntimeException(e.getMessage()); + } + } + + @Override + public void stop() { + producer.shutdown(); + } +} diff --git a/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceConnector.java b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceConnector.java new file mode 100644 index 0000000..778aa2b --- /dev/null +++ b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceConnector.java @@ -0,0 +1,81 @@ +package org.apache.rocketmq.connect.rocketmq; + +import com.aliyun.openservices.shade.org.apache.commons.lang3.StringUtils; +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.component.task.Task; +import io.openmessaging.connector.api.component.task.source.SourceConnector; +import io.openmessaging.internal.DefaultKeyValue; +import org.apache.rocketmq.connect.rocketmq.common.RocketMQConstant; + +import java.util.ArrayList; +import java.util.List; + +public class RocketMQSourceConnector extends SourceConnector { + + private String accessKeyId; + + private String accessKeySecret; + + private String namesrvAddr; + + private String topic; + + private String instanceId; + + private String consumerGroup; + + @Override + public void pause() { + + } + + @Override + public void resume() { + + } + + @Override + public List<KeyValue> taskConfigs(int maxTasks) { + List<KeyValue> keyValues = new ArrayList<>(); + KeyValue keyValue = new DefaultKeyValue(); + keyValue.put(RocketMQConstant.ACCESS_KEY_ID, accessKeyId); + keyValue.put(RocketMQConstant.ACCESS_KEY_SECRET,accessKeySecret); + keyValue.put(RocketMQConstant.INSTANCE_ID, instanceId); + keyValue.put(RocketMQConstant.NAMESRV_ADDR, namesrvAddr); + keyValue.put(RocketMQConstant.TOPIC, topic); + keyValue.put(RocketMQConstant.CONSUMER_GROUP, consumerGroup); + keyValues.add(keyValue); + return keyValues; + } + + @Override + public Class<? extends Task> taskClass() { + return RocketMQSourceTask.class; + } + + @Override + public void validate(KeyValue config) { + if (StringUtils.isBlank(config.getString(RocketMQConstant.ACCESS_KEY_ID)) + || StringUtils.isBlank(config.getString(RocketMQConstant.ACCESS_KEY_SECRET)) + || StringUtils.isBlank(config.getString(RocketMQConstant.NAMESRV_ADDR)) + || StringUtils.isBlank(config.getString(RocketMQConstant.TOPIC)) + || StringUtils.isBlank(config.getString(RocketMQConstant.CONSUMER_GROUP))) { + throw new RuntimeException("rocketmq required parameter is null !"); + } + } + + @Override + public void init(KeyValue config) { + accessKeyId = config.getString(RocketMQConstant.ACCESS_KEY_ID); + accessKeySecret = config.getString(RocketMQConstant.ACCESS_KEY_SECRET); + namesrvAddr = config.getString(RocketMQConstant.NAMESRV_ADDR); + topic = config.getString(RocketMQConstant.TOPIC); + instanceId = config.getString(RocketMQConstant.INSTANCE_ID); + consumerGroup = config.getString(RocketMQConstant.CONSUMER_GROUP); + } + + @Override + public void stop() { + + } +} diff --git a/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceTask.java b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceTask.java new file mode 100644 index 0000000..9de3ace --- /dev/null +++ b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceTask.java @@ -0,0 +1,181 @@ +package org.apache.rocketmq.connect.rocketmq; + +import com.aliyun.ons20190214.Client; +import com.aliyun.ons20190214.models.OnsGroupListRequest; +import com.aliyun.ons20190214.models.OnsGroupListResponse; +import com.aliyun.ons20190214.models.OnsTopicListRequest; +import com.aliyun.ons20190214.models.OnsTopicListResponse; +import com.aliyun.openservices.ons.api.Action; +import com.aliyun.openservices.ons.api.Consumer; +import com.aliyun.openservices.ons.api.ONSFactory; +import com.aliyun.openservices.ons.api.PropertyKeyConst; +import com.aliyun.openservices.shade.com.google.common.collect.Maps; +import com.aliyun.openservices.shade.org.apache.commons.lang3.StringUtils; +import com.aliyun.teaopenapi.models.Config; +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.component.task.source.SourceTask; +import io.openmessaging.connector.api.component.task.source.SourceTaskContext; +import io.openmessaging.connector.api.data.ConnectRecord; +import io.openmessaging.connector.api.data.RecordOffset; +import io.openmessaging.connector.api.data.RecordPartition; +import org.apache.rocketmq.connect.rocketmq.common.RocketMQConstant; +import org.apache.rocketmq.connect.rocketmq.utils.OnsUtils; +import org.assertj.core.util.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; + +public class RocketMQSourceTask extends SourceTask { + private static final Logger log = LoggerFactory.getLogger(RocketMQSourceTask.class); + + private String accessKeyId; + + private String accessKeySecret; + + private String namesrvAddr; + + private String topic; + + private String instanceId; + + private String consumerGroup; + + private Consumer consumer; + + private final BlockingQueue<ConnectRecord> blockingQueue = new LinkedBlockingDeque<>(1000); + private static final int BATCH_POLL_SIZE = 10; + private static final int DEFAULT_CONSUMER_TIMEOUT_SECONDS = 20; + + @Override + public List<ConnectRecord> poll() throws InterruptedException { + if (consumer == null) { + initConsumer(); + } + List<ConnectRecord> connectRecords = Lists.newArrayList(); + blockingQueue.drainTo(connectRecords, BATCH_POLL_SIZE); + return connectRecords; + } + + @Override + public void pause() { + + } + + @Override + public void resume() { + + } + + @Override + public void validate(KeyValue config) { + if (StringUtils.isBlank(config.getString(RocketMQConstant.ACCESS_KEY_ID)) + || StringUtils.isBlank(config.getString(RocketMQConstant.ACCESS_KEY_SECRET)) + || StringUtils.isBlank(config.getString(RocketMQConstant.NAMESRV_ADDR)) + || StringUtils.isBlank(config.getString(RocketMQConstant.TOPIC)) + || StringUtils.isBlank(config.getString(RocketMQConstant.CONSUMER_GROUP))) { + throw new RuntimeException("rocketmq required parameter is null !"); + } + // 检查topic和consumer group是否存在 + try { + Config onsConfig = new Config() + .setAccessKeyId(config.getString(RocketMQConstant.ACCESS_KEY_ID)) + .setAccessKeySecret(config.getString(RocketMQConstant.ACCESS_KEY_SECRET)); + onsConfig.endpoint = OnsUtils.parseEndpoint(config.getString(RocketMQConstant.NAMESRV_ADDR)); + final Client client = new Client(onsConfig); + OnsTopicListRequest onsTopicListRequest = new OnsTopicListRequest() + .setTopic(config.getString(RocketMQConstant.TOPIC)) + .setInstanceId(config.getString(RocketMQConstant.INSTANCE_ID)); + final OnsTopicListResponse onsTopicListResponse = client.onsTopicList(onsTopicListRequest); + if (onsTopicListResponse.getBody().getData().getPublishInfoDo().isEmpty()) { + throw new RuntimeException("rocketmq required parameter topic does not exist !"); + } + OnsGroupListRequest onsGroupListRequest = new OnsGroupListRequest() + .setInstanceId(config.getString(RocketMQConstant.INSTANCE_ID)) + .setGroupId(config.getString(RocketMQConstant.CONSUMER_GROUP)); + final OnsGroupListResponse onsGroupListResponse = client.onsGroupList(onsGroupListRequest); + if (onsGroupListResponse.getBody().getData().getSubscribeInfoDo().isEmpty()) { + throw new RuntimeException("rocketmq required parameter consumerGroup does not exist !"); + } + } catch (Exception e) { + log.error("RocketMQSinkTask | validate | error => ", e); + throw new RuntimeException(e.getMessage()); + } + } + + @Override + public void init(KeyValue config) { + accessKeyId = config.getString(RocketMQConstant.ACCESS_KEY_ID); + accessKeySecret = config.getString(RocketMQConstant.ACCESS_KEY_SECRET); + namesrvAddr = config.getString(RocketMQConstant.NAMESRV_ADDR); + topic = config.getString(RocketMQConstant.TOPIC); + instanceId = config.getString(RocketMQConstant.INSTANCE_ID); + consumerGroup = config.getString(RocketMQConstant.CONSUMER_GROUP); + } + + @Override + public void start(SourceTaskContext sourceTaskContext) { + try { + super.start(sourceTaskContext); + initConsumer(); + consumer.start(); + } catch (Exception e) { + log.error("RocketMQSourceTask | start | error => ", e); + throw e; + } + } + + private void initConsumer() { + try { + Properties properties = new Properties(); + properties.put(PropertyKeyConst.GROUP_ID, consumerGroup); + properties.put(PropertyKeyConst.AccessKey, accessKeyId); + properties.put(PropertyKeyConst.SecretKey, accessKeySecret); + properties.put(PropertyKeyConst.NAMESRV_ADDR, namesrvAddr); + if (StringUtils.isNotBlank(instanceId)) { + properties.put(PropertyKeyConst.INSTANCE_ID, instanceId); + } + consumer = ONSFactory.createConsumer(properties); + // TODO TAG先忽略 + consumer.subscribe(topic, "*", (message, consumeContext) -> { + try { + Map<String, String> sourceRecordPartition = Maps.newHashMap(); + sourceRecordPartition.put("topic", message.getTopic()); + sourceRecordPartition.put("brokerName", message.getBornHost()); + Map<String, String> sourceRecordOffset = Maps.newHashMap(); + sourceRecordOffset.put("queueOffset", Long.toString(message.getOffset())); + RecordPartition recordPartition = new RecordPartition(sourceRecordPartition); + RecordOffset recordOffset = new RecordOffset(sourceRecordOffset); + ConnectRecord connectRecord = new ConnectRecord(recordPartition, recordOffset, message.getBornTimestamp()); + connectRecord.setData(new String(message.getBody(), StandardCharsets.UTF_8)); + final Properties userProperties = message.getUserProperties(); + final Set<String> keys = userProperties.stringPropertyNames(); + keys.forEach(key -> connectRecord.addExtension(key, userProperties.get(key).toString())); + final boolean offer = blockingQueue.offer(connectRecord, DEFAULT_CONSUMER_TIMEOUT_SECONDS, TimeUnit.SECONDS); + if (!offer) { + return Action.ReconsumeLater; + } + return Action.CommitMessage; + } catch (Exception e) { + log.error("RocketMQSourceTask | initConsumer | error => ", e); + return Action.ReconsumeLater; + } + }); + } catch (Exception e) { + log.error("RocketMQSourceTask | initConsumer | error => ", e); + throw e; + } + } + + @Override + public void stop() { + consumer.shutdown(); + } +} diff --git a/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/common/RocketMQConstant.java b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/common/RocketMQConstant.java new file mode 100644 index 0000000..cb1f352 --- /dev/null +++ b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/common/RocketMQConstant.java @@ -0,0 +1,17 @@ +package org.apache.rocketmq.connect.rocketmq.common; + +public class RocketMQConstant { + + public static final String ACCESS_KEY_ID = "accessKeyId"; + + public static final String ACCESS_KEY_SECRET = "accessKeySecret"; + + public static final String NAMESRV_ADDR = "namesrvAddr"; + + public static final String TOPIC = "topic"; + + public static final String INSTANCE_ID = "instanceId"; + + public static final String CONSUMER_GROUP = "consumerGroup"; + +} diff --git a/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/utils/OnsUtils.java b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/utils/OnsUtils.java new file mode 100644 index 0000000..2a90f50 --- /dev/null +++ b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/utils/OnsUtils.java @@ -0,0 +1,9 @@ +package org.apache.rocketmq.connect.rocketmq.utils; + +public class OnsUtils { + + public static String parseEndpoint(String namesrvAddr) { + final String[] split = namesrvAddr.split("\\."); + return "ons." + split[1] + ".aliyuncs.com"; + } +} diff --git a/connectors/aliyun/rocketmq-connect-rocketmq/src/test/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkConnectorTest.java b/connectors/aliyun/rocketmq-connect-rocketmq/src/test/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkConnectorTest.java new file mode 100644 index 0000000..897bb09 --- /dev/null +++ b/connectors/aliyun/rocketmq-connect-rocketmq/src/test/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkConnectorTest.java @@ -0,0 +1,95 @@ +package org.apache.rocketmq.connect.rocketmq; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.component.task.sink.SinkTaskContext; +import io.openmessaging.connector.api.data.ConnectRecord; +import io.openmessaging.connector.api.data.RecordOffset; +import io.openmessaging.connector.api.data.RecordPartition; +import io.openmessaging.internal.DefaultKeyValue; +import org.apache.rocketmq.connect.rocketmq.common.RocketMQConstant; +import org.assertj.core.util.Lists; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class RocketMQSinkConnectorTest { + + @Test + public void testTaskConfigs() { + RocketMQSinkConnector rocketMQSinkConnector= new RocketMQSinkConnector(); + Assert.assertEquals(rocketMQSinkConnector.taskConfigs(1).size(), 1); + } + + @Test + public void testPut() { + RocketMQSinkTask rocketMQSinkTask = new RocketMQSinkTask(); + KeyValue keyValue = new DefaultKeyValue(); + keyValue.put(RocketMQConstant.ACCESS_KEY_ID, "xxxx"); + keyValue.put(RocketMQConstant.ACCESS_KEY_SECRET, "xxxx"); + keyValue.put(RocketMQConstant.NAMESRV_ADDR, "xxxx"); + keyValue.put(RocketMQConstant.TOPIC, "xxxx"); + // 有实例Id需要填写实例Id + keyValue.put(RocketMQConstant.INSTANCE_ID, "xxxx"); + rocketMQSinkTask.init(keyValue); + rocketMQSinkTask.start(new SinkTaskContext() { + @Override + public String getConnectorName() { + return null; + } + + @Override + public String getTaskName() { + return null; + } + + @Override + public void resetOffset(RecordPartition recordPartition, RecordOffset recordOffset) { + + } + + @Override + public void resetOffset(Map<RecordPartition, RecordOffset> offsets) { + + } + + @Override + public void pause(List<RecordPartition> partitions) { + + } + + @Override + public void resume(List<RecordPartition> partitions) { + + } + + @Override + public Set<RecordPartition> assignment() { + return null; + } + }); + List<ConnectRecord> connectRecords = Lists.newArrayList(); + ConnectRecord connectRecord = new ConnectRecord(new RecordPartition(new HashMap<>()), new RecordOffset(new HashMap<>()), System.currentTimeMillis()); + connectRecord.setData("test message"); + connectRecords.add(connectRecord); + connectRecord.addExtension("key", "value"); + rocketMQSinkTask.put(connectRecords); + } + + @Test + public void testValidate() { + RocketMQSinkTask rocketMQSinkTask = new RocketMQSinkTask(); + KeyValue keyValue = new DefaultKeyValue(); + keyValue.put(RocketMQConstant.ACCESS_KEY_ID, "xxxx"); + keyValue.put(RocketMQConstant.ACCESS_KEY_SECRET, "xxxx"); + keyValue.put(RocketMQConstant.NAMESRV_ADDR, "xxxx"); + keyValue.put(RocketMQConstant.TOPIC, "xxxx"); + // 有实例Id需要填写实例Id + keyValue.put(RocketMQConstant.INSTANCE_ID, "xxxx"); + rocketMQSinkTask.validate(keyValue); + } + +} diff --git a/connectors/aliyun/rocketmq-connect-rocketmq/src/test/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceConnectorTest.java b/connectors/aliyun/rocketmq-connect-rocketmq/src/test/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceConnectorTest.java new file mode 100644 index 0000000..bbdac55 --- /dev/null +++ b/connectors/aliyun/rocketmq-connect-rocketmq/src/test/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceConnectorTest.java @@ -0,0 +1,64 @@ +package org.apache.rocketmq.connect.rocketmq; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.component.task.source.SourceTaskContext; +import io.openmessaging.connector.api.storage.OffsetStorageReader; +import io.openmessaging.internal.DefaultKeyValue; +import org.apache.rocketmq.connect.rocketmq.common.RocketMQConstant; +import org.junit.Assert; +import org.junit.Test; + +public class RocketMQSourceConnectorTest { + + @Test + public void testTaskConfigs() { + RocketMQSourceConnector rocketMQSourceConnector = new RocketMQSourceConnector(); + Assert.assertEquals(rocketMQSourceConnector.taskConfigs(1).size(), 1); + } + + @Test + public void testPut() throws InterruptedException { + RocketMQSourceTask rocketMQSourceTask = new RocketMQSourceTask(); + KeyValue keyValue = new DefaultKeyValue(); + keyValue.put(RocketMQConstant.ACCESS_KEY_ID, "xxxx"); + keyValue.put(RocketMQConstant.ACCESS_KEY_SECRET, "xxxx"); + keyValue.put(RocketMQConstant.NAMESRV_ADDR, "xxxx"); + keyValue.put(RocketMQConstant.TOPIC, "xxxx"); + // 有实例Id需要填写实例Id + keyValue.put(RocketMQConstant.INSTANCE_ID, "xxxx"); + keyValue.put(RocketMQConstant.CONSUMER_GROUP, "xxxx"); + rocketMQSourceTask.init(keyValue); + rocketMQSourceTask.start(new SourceTaskContext() { + @Override + public OffsetStorageReader offsetStorageReader() { + return null; + } + + @Override + public String getConnectorName() { + return null; + } + + @Override + public String getTaskName() { + return null; + } + }); + rocketMQSourceTask.poll(); + Thread.sleep(50000); + } + + @Test + public void testValidate() { + RocketMQSourceTask rocketMQSourceTask = new RocketMQSourceTask(); + KeyValue keyValue = new DefaultKeyValue(); + keyValue.put(RocketMQConstant.ACCESS_KEY_ID, "xxxx"); + keyValue.put(RocketMQConstant.ACCESS_KEY_SECRET, "xxxx"); + keyValue.put(RocketMQConstant.NAMESRV_ADDR, "xxxx"); + keyValue.put(RocketMQConstant.TOPIC, "xxxx"); + // 有实例Id需要填写实例Id + keyValue.put(RocketMQConstant.INSTANCE_ID, "xxxx"); + keyValue.put(RocketMQConstant.CONSUMER_GROUP, "xxxx"); + rocketMQSourceTask.validate(keyValue); + } +}