This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new d122bbd  [ISSUE #26] Add RocketMQ Source Connect and RocketMQ Sink 
Connect (#27)
d122bbd is described below

commit d122bbdea910267967a5f103ad5255dbb7e7120e
Author: zhaohai <33314633+zhaohai1299002...@users.noreply.github.com>
AuthorDate: Fri Apr 15 13:57:13 2022 +0800

    [ISSUE #26] Add RocketMQ Source Connect and RocketMQ Sink Connect (#27)
    
    * add RocketMQ Source Connect and RocketMQ Sink Connect
    
    * update pom
    
    Co-authored-by: zh378814 <wb-zh378...@alibaba-inc.com>
---
 .../aliyun/rocketmq-connect-rocketmq/README.md     |  71 ++++++++
 .../aliyun/rocketmq-connect-rocketmq/pom.xml       | 194 +++++++++++++++++++++
 .../connect/rocketmq/RocketMQSinkConnector.java    |  77 ++++++++
 .../connect/rocketmq/RocketMQSinkTask.java         | 136 +++++++++++++++
 .../connect/rocketmq/RocketMQSourceConnector.java  |  81 +++++++++
 .../connect/rocketmq/RocketMQSourceTask.java       | 181 +++++++++++++++++++
 .../connect/rocketmq/common/RocketMQConstant.java  |  17 ++
 .../rocketmq/connect/rocketmq/utils/OnsUtils.java  |   9 +
 .../rocketmq/RocketMQSinkConnectorTest.java        |  95 ++++++++++
 .../rocketmq/RocketMQSourceConnectorTest.java      |  64 +++++++
 10 files changed, 925 insertions(+)

diff --git a/connectors/aliyun/rocketmq-connect-rocketmq/README.md 
b/connectors/aliyun/rocketmq-connect-rocketmq/README.md
new file mode 100644
index 0000000..cb41006
--- /dev/null
+++ b/connectors/aliyun/rocketmq-connect-rocketmq/README.md
@@ -0,0 +1,71 @@
+# rocketmq-connect-rocketmq
+
+## rocketmq-connect-rocketmq 打包
+```
+mvn clean install -Dmaven.test.skip=true
+```
+
+## rocketmq-connect-rocketmq 启动
+
+* **rocketmq-source-connector** 启动
+
+```
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-rocketmq-source-connector-name}
+?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.rocketmq.RocketMQSourceConnector",“accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",namesrvAddr”:"${namesrvAddr}","topic":"${topic}","instanceId":"${instanceId}","consumerGroup":"${consumerGroup}"}
+```
+
+例子
+
+```
+http://localhost:8081/connectors/rocketmqConnectorSource?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster";,
+"connector-class":"org.apache.rocketmq.connect.rocketmq.RocketMQSourceConnector","accessKeyId":"xxxx","accessKeySecret":"xxxx","namesrvAddr":"http://127.0.0.1:9876","topic":"topic";,
+"instanceId":"xxxx", "consumerGroup":"xxxx"}
+```
+
+* **rocketmq-sink-connector** 启动
+
+```
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-rocketmq-sink-connector-name}
+?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.rocketmq.RocketMQSinkConnector",“accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",namesrvAddr”:"${namesrvAddr}","topic":"${topic}","instanceId":"${instanceId}"}
+```
+
+例子 
+```
+http://localhost:8081/connectors/rocketmqConnectorSink?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster";,
+"connector-class":"org.apache.rocketmq.connect.rocketmq.RocketMQSinkConnector","accessKeyId":"xxxx","accessKeySecret":"xxxx","namesrvAddr":"http://127.0.0.1:9876","topic":"topic";,
+"instanceId":"xxxx"}
+```
+
+>**注:** `rocketmq-rocketmq-connect` 
的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的所有`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中
+
+## rocketmq-connect-rocketmq 停止
+
+```
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-rocketmq-connector-name}/stop
+```
+
+## rocketmq-connect-rocketmq 参数说明
+* **rocketmq-source-connector 参数说明**
+
+|         KEY            |  TYPE   | Must be filled | Description| Example
+|------------------------|---------|----------------|------------|---|
+| accessKeyId           | String  | YES            | AccessKey 
ID阿里云身份验证,在阿里云服务器管理控制台创建 | xxxx    |
+| accessKeySecret       | String  | YES            | AccessKey 
Secret阿里云身份验证,在阿里云服务器管理控制台创建 | xxxx    |
+| namesrvAddr           | String  | YES            | 
设置TCP接入域名,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看 | xxxx    |
+| topic                 | String  | YES            | 消息主题          | xxxx    |
+| instanceId            | String  | NO             | 阿里云MQ控制台的实例Id | xxxx    |
+| consumerGroup            | String  | YES            | 消息订阅者 | xxxx    |
+
+```  
+注:1. source/sink配置文件说明是以rocketmq-connect-rocketmq为demo,不同source/sink 
connector配置有差异,请以具体sourc/sink connector为准
+```  
+* **rocketmq-sink-connector 参数说明**
+
+| KEY                   |  TYPE   | Must be filled | Description | Example 
+|-----------------------|---------|----------------|-----------|---------|
+| accessKeyId           | String  | YES            | AccessKey 
ID阿里云身份验证,在阿里云服务器管理控制台创建 | xxxx    |
+| accessKeySecret       | String  | YES            | AccessKey 
Secret阿里云身份验证,在阿里云服务器管理控制台创建 | xxxx    |
+| namesrvAddr           | String  | YES            | 
设置TCP接入域名,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看 | xxxx    |
+| topic                 | String  | YES            | 消息主题          | xxxx    |
+| instanceId            | String  | NO             | 阿里云MQ控制台的实例Id | xxxx    |
+
diff --git a/connectors/aliyun/rocketmq-connect-rocketmq/pom.xml 
b/connectors/aliyun/rocketmq-connect-rocketmq/pom.xml
new file mode 100644
index 0000000..f086c47
--- /dev/null
+++ b/connectors/aliyun/rocketmq-connect-rocketmq/pom.xml
@@ -0,0 +1,194 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <groupId>org.apache.rocketmq</groupId>
+    <artifactId>rocketmq-connect-rocketmq</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <name>connect-rocketmq</name>
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+        <slf4j.version>1.7.7</slf4j.version>
+        <logback.version>1.0.13</logback.version>
+        
<openmessaging-connector.version>0.1.2-SNAPSHOT</openmessaging-connector.version>
+        <junit.version>4.12</junit.version>
+        <assertj.version>3.22.0</assertj.version>
+        <mockito.version>4.0.0</mockito.version>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <ons-client.version>1.8.8.3.Final</ons-client.version>
+        <ons20190214.version>1.0.0</ons20190214.version>
+    </properties>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>versions-maven-plugin</artifactId>
+                <version>2.3</version>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>clirr-maven-plugin</artifactId>
+                <version>2.7</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <configuration>
+                    
<outputDirectory>${project.build.directory}/lib</outputDirectory>
+                    <excludeTransitive>false</excludeTransitive>
+                    <stripVersion>true</stripVersion>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.6.1</version>
+                <configuration>
+                    <source>${maven.compiler.source}</source>
+                    <target>${maven.compiler.target}</target>
+                    <compilerVersion>${maven.compiler.source}</compilerVersion>
+                    <showDeprecation>true</showDeprecation>
+                    <showWarnings>true</showWarnings>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.19.1</version>
+                <configuration>
+                    <argLine>-Xms512m -Xmx1024m</argLine>
+                    <forkMode>always</forkMode>
+                    <includes>
+                        <include>**/*Test.java</include>
+                    </includes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-site-plugin</artifactId>
+                <version>3.6</version>
+                <configuration>
+                    <locales>en_US</locales>
+                    <outputEncoding>UTF-8</outputEncoding>
+                    <inputEncoding>UTF-8</inputEncoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-source-plugin</artifactId>
+                <version>3.0.1</version>
+                <executions>
+                    <execution>
+                        <id>attach-sources</id>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <version>2.10.4</version>
+                <configuration>
+                    <charset>UTF-8</charset>
+                    <locale>en_US</locale>
+                    
<excludePackageNames>io.openmessaging.internal</excludePackageNames>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>aggregate</id>
+                        <goals>
+                            <goal>aggregate</goal>
+                        </goals>
+                        <phase>site</phase>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-resources-plugin</artifactId>
+                <version>3.0.2</version>
+                <configuration>
+                    <encoding>${project.build.sourceEncoding}</encoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>findbugs-maven-plugin</artifactId>
+                <version>3.0.4</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>3.0.0</version>
+                <configuration>
+                    <archive>
+                        <manifest>
+                            
<mainClass>org.apache.rocketmq.connect.rocketmq.RocketMQSourceConnector</mainClass>
+                        </manifest>
+                    </archive>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.aliyun</groupId>
+            <artifactId>ons20190214</artifactId>
+            <version>${ons20190214.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.aliyun.openservices</groupId>
+            <artifactId>ons-client</artifactId>
+            <version>${ons-client.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>${slf4j.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <version>${logback.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+            <version>${logback.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <version>${assertj.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-connector</artifactId>
+            <version>${openmessaging-connector.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>${junit.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>${mockito.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git 
a/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkConnector.java
 
b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkConnector.java
new file mode 100644
index 0000000..790c6dd
--- /dev/null
+++ 
b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkConnector.java
@@ -0,0 +1,77 @@
+package org.apache.rocketmq.connect.rocketmq;
+
+
+import com.aliyun.openservices.shade.org.apache.commons.lang3.StringUtils;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.sink.SinkConnector;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.connect.rocketmq.common.RocketMQConstant;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class RocketMQSinkConnector extends SinkConnector {
+
+    private String accessKeyId;
+
+    private String accessKeySecret;
+
+    private String namesrvAddr;
+
+    private String topic;
+
+    private String instanceId;
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+
+    @Override
+    public List<KeyValue> taskConfigs(int maxTasks) {
+        List<KeyValue> keyValues = new ArrayList<>();
+        KeyValue keyValue = new DefaultKeyValue();
+        keyValue.put(RocketMQConstant.ACCESS_KEY_ID, accessKeyId);
+        keyValue.put(RocketMQConstant.ACCESS_KEY_SECRET, accessKeySecret);
+        keyValue.put(RocketMQConstant.TOPIC, topic);
+        keyValue.put(RocketMQConstant.INSTANCE_ID, instanceId);
+        keyValue.put(RocketMQConstant.NAMESRV_ADDR, namesrvAddr);
+        keyValues.add(keyValue);
+        return keyValues;
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return RocketMQSinkTask.class;
+    }
+
+    @Override
+    public void validate(KeyValue config) {
+        if 
(StringUtils.isBlank(config.getString(RocketMQConstant.ACCESS_KEY_ID))
+                || 
StringUtils.isBlank(config.getString(RocketMQConstant.ACCESS_KEY_SECRET))
+                || 
StringUtils.isBlank(config.getString(RocketMQConstant.NAMESRV_ADDR))
+                || 
StringUtils.isBlank(config.getString(RocketMQConstant.TOPIC))) {
+            throw new RuntimeException("rocketmq required parameter is null 
!");
+        }
+    }
+
+    @Override
+    public void init(KeyValue config) {
+        accessKeyId = config.getString(RocketMQConstant.ACCESS_KEY_ID);
+        accessKeySecret = config.getString(RocketMQConstant.ACCESS_KEY_SECRET);
+        namesrvAddr = config.getString(RocketMQConstant.NAMESRV_ADDR);
+        topic = config.getString(RocketMQConstant.TOPIC);
+        instanceId = config.getString(RocketMQConstant.INSTANCE_ID);
+    }
+
+    @Override
+    public void stop() {
+
+    }
+}
diff --git 
a/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkTask.java
 
b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkTask.java
new file mode 100644
index 0000000..6c8eb36
--- /dev/null
+++ 
b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkTask.java
@@ -0,0 +1,136 @@
+package org.apache.rocketmq.connect.rocketmq;
+
+import com.aliyun.ons20190214.Client;
+import com.aliyun.ons20190214.models.OnsTopicListRequest;
+import com.aliyun.ons20190214.models.OnsTopicListResponse;
+import com.aliyun.openservices.ons.api.Message;
+import com.aliyun.openservices.ons.api.ONSFactory;
+import com.aliyun.openservices.ons.api.Producer;
+import com.aliyun.openservices.ons.api.PropertyKeyConst;
+import com.aliyun.openservices.shade.com.alibaba.fastjson.JSON;
+import com.aliyun.openservices.shade.org.apache.commons.lang3.StringUtils;
+import com.aliyun.teaopenapi.models.Config;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.rocketmq.connect.rocketmq.common.RocketMQConstant;
+import org.apache.rocketmq.connect.rocketmq.utils.OnsUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Properties;
+
+public class RocketMQSinkTask extends SinkTask {
+    private static final Logger log = 
LoggerFactory.getLogger(RocketMQSinkTask.class);
+
+    private String accessKeyId;
+
+    private String accessKeySecret;
+
+    private String namesrvAddr;
+
+    private String topic;
+
+    private Producer producer;
+
+    private String instanceId;
+
+    @Override
+    public void put(List<ConnectRecord> sinkRecords) throws ConnectException {
+        try {
+            sinkRecords.forEach(connectRecord -> {
+                Message message = new Message();
+                
message.setBody(JSON.toJSONString(connectRecord.getData()).getBytes(StandardCharsets.UTF_8));
+                // TODO message.setKey();
+                // TODO message.setTag();
+                final KeyValue extensions = connectRecord.getExtensions();
+                if (extensions != null) {
+                    extensions.keySet().forEach(key -> 
message.putUserProperties(key, extensions.getString(key)));
+                }
+                message.setTopic(topic);
+                producer.send(message);
+            });
+        } catch (Exception e) {
+            log.error("RocketMQSinkTask | put | error => ", e);
+            throw new ConnectException(e);
+        }
+    }
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+
+    @Override
+    public void validate(KeyValue config) {
+        if 
(StringUtils.isBlank(config.getString(RocketMQConstant.ACCESS_KEY_ID))
+            || 
StringUtils.isBlank(config.getString(RocketMQConstant.ACCESS_KEY_SECRET))
+            || 
StringUtils.isBlank(config.getString(RocketMQConstant.NAMESRV_ADDR))
+            || StringUtils.isBlank(config.getString(RocketMQConstant.TOPIC))) {
+            throw new RuntimeException("rocketmq required parameter is null 
!");
+        }
+        // 检查topic是否存在
+        try {
+            Config onsConfig = new Config()
+                    
.setAccessKeyId(config.getString(RocketMQConstant.ACCESS_KEY_ID))
+                    
.setAccessKeySecret(config.getString(RocketMQConstant.ACCESS_KEY_SECRET));
+            onsConfig.endpoint = 
OnsUtils.parseEndpoint(config.getString(RocketMQConstant.NAMESRV_ADDR));
+            final Client client = new Client(onsConfig);
+            OnsTopicListRequest onsTopicListRequest = new OnsTopicListRequest()
+                    .setTopic(config.getString(RocketMQConstant.TOPIC))
+                    
.setInstanceId(config.getString(RocketMQConstant.INSTANCE_ID));
+            final OnsTopicListResponse onsTopicListResponse = 
client.onsTopicList(onsTopicListRequest);
+            if 
(onsTopicListResponse.getBody().getData().getPublishInfoDo().isEmpty()) {
+                throw new RuntimeException("rocketmq required parameter topic 
does not exist !");
+            }
+        } catch (Exception e) {
+            log.error("RocketMQSinkTask | validate | error => ", e);
+            throw new RuntimeException(e.getMessage());
+        }
+    }
+
+    @Override
+    public void init(KeyValue config) {
+        accessKeyId = config.getString(RocketMQConstant.ACCESS_KEY_ID);
+        accessKeySecret = config.getString(RocketMQConstant.ACCESS_KEY_SECRET);
+        namesrvAddr = config.getString(RocketMQConstant.NAMESRV_ADDR);
+        topic = config.getString(RocketMQConstant.TOPIC);
+        instanceId = config.getString(RocketMQConstant.INSTANCE_ID);
+    }
+
+    @Override
+    public void start(SinkTaskContext sinkTaskContext) {
+        try {
+            super.start(sinkTaskContext);
+            if (producer != null) {
+                producer.shutdown();
+            }
+            Properties properties = new Properties();
+            properties.put(PropertyKeyConst.AccessKey, accessKeyId);
+            properties.put(PropertyKeyConst.SecretKey, accessKeySecret);
+            if (StringUtils.isNotBlank(instanceId)) {
+                properties.put(PropertyKeyConst.INSTANCE_ID,  instanceId);
+            }
+            properties.put(PropertyKeyConst.NAMESRV_ADDR, namesrvAddr);
+            producer = ONSFactory.createProducer(properties);
+            producer.start();
+        } catch (Exception e) {
+            log.error("RocketMQSinkTask | start | error =>", e);
+            throw new RuntimeException(e.getMessage());
+        }
+    }
+
+    @Override
+    public void stop() {
+        producer.shutdown();
+    }
+}
diff --git 
a/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceConnector.java
 
b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceConnector.java
new file mode 100644
index 0000000..778aa2b
--- /dev/null
+++ 
b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceConnector.java
@@ -0,0 +1,81 @@
+package org.apache.rocketmq.connect.rocketmq;
+
+import com.aliyun.openservices.shade.org.apache.commons.lang3.StringUtils;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.connect.rocketmq.common.RocketMQConstant;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class RocketMQSourceConnector extends SourceConnector {
+
+    private String accessKeyId;
+
+    private String accessKeySecret;
+
+    private String namesrvAddr;
+
+    private String topic;
+
+    private String instanceId;
+
+    private String consumerGroup;
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+
+    @Override
+    public List<KeyValue> taskConfigs(int maxTasks) {
+        List<KeyValue> keyValues = new ArrayList<>();
+        KeyValue keyValue = new DefaultKeyValue();
+        keyValue.put(RocketMQConstant.ACCESS_KEY_ID, accessKeyId);
+        keyValue.put(RocketMQConstant.ACCESS_KEY_SECRET,accessKeySecret);
+        keyValue.put(RocketMQConstant.INSTANCE_ID, instanceId);
+        keyValue.put(RocketMQConstant.NAMESRV_ADDR, namesrvAddr);
+        keyValue.put(RocketMQConstant.TOPIC, topic);
+        keyValue.put(RocketMQConstant.CONSUMER_GROUP, consumerGroup);
+        keyValues.add(keyValue);
+        return keyValues;
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return RocketMQSourceTask.class;
+    }
+
+    @Override
+    public void validate(KeyValue config) {
+        if 
(StringUtils.isBlank(config.getString(RocketMQConstant.ACCESS_KEY_ID))
+                || 
StringUtils.isBlank(config.getString(RocketMQConstant.ACCESS_KEY_SECRET))
+                || 
StringUtils.isBlank(config.getString(RocketMQConstant.NAMESRV_ADDR))
+                || 
StringUtils.isBlank(config.getString(RocketMQConstant.TOPIC))
+                || 
StringUtils.isBlank(config.getString(RocketMQConstant.CONSUMER_GROUP))) {
+            throw new RuntimeException("rocketmq required parameter is null 
!");
+        }
+    }
+
+    @Override
+    public void init(KeyValue config) {
+        accessKeyId = config.getString(RocketMQConstant.ACCESS_KEY_ID);
+        accessKeySecret = config.getString(RocketMQConstant.ACCESS_KEY_SECRET);
+        namesrvAddr = config.getString(RocketMQConstant.NAMESRV_ADDR);
+        topic = config.getString(RocketMQConstant.TOPIC);
+        instanceId = config.getString(RocketMQConstant.INSTANCE_ID);
+        consumerGroup = config.getString(RocketMQConstant.CONSUMER_GROUP);
+    }
+
+    @Override
+    public void stop() {
+
+    }
+}
diff --git 
a/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceTask.java
 
b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceTask.java
new file mode 100644
index 0000000..9de3ace
--- /dev/null
+++ 
b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceTask.java
@@ -0,0 +1,181 @@
+package org.apache.rocketmq.connect.rocketmq;
+
+import com.aliyun.ons20190214.Client;
+import com.aliyun.ons20190214.models.OnsGroupListRequest;
+import com.aliyun.ons20190214.models.OnsGroupListResponse;
+import com.aliyun.ons20190214.models.OnsTopicListRequest;
+import com.aliyun.ons20190214.models.OnsTopicListResponse;
+import com.aliyun.openservices.ons.api.Action;
+import com.aliyun.openservices.ons.api.Consumer;
+import com.aliyun.openservices.ons.api.ONSFactory;
+import com.aliyun.openservices.ons.api.PropertyKeyConst;
+import com.aliyun.openservices.shade.com.google.common.collect.Maps;
+import com.aliyun.openservices.shade.org.apache.commons.lang3.StringUtils;
+import com.aliyun.teaopenapi.models.Config;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.component.task.source.SourceTaskContext;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import org.apache.rocketmq.connect.rocketmq.common.RocketMQConstant;
+import org.apache.rocketmq.connect.rocketmq.utils.OnsUtils;
+import org.assertj.core.util.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+public class RocketMQSourceTask extends SourceTask {
+    private static final Logger log = 
LoggerFactory.getLogger(RocketMQSourceTask.class);
+
+    private String accessKeyId;
+
+    private String accessKeySecret;
+
+    private String namesrvAddr;
+
+    private String topic;
+
+    private String instanceId;
+
+    private String consumerGroup;
+
+    private Consumer consumer;
+
+    private final BlockingQueue<ConnectRecord> blockingQueue = new 
LinkedBlockingDeque<>(1000);
+    private static final int BATCH_POLL_SIZE = 10;
+    private static final int DEFAULT_CONSUMER_TIMEOUT_SECONDS = 20;
+
+    @Override
+    public List<ConnectRecord> poll() throws InterruptedException {
+        if (consumer == null) {
+            initConsumer();
+        }
+        List<ConnectRecord> connectRecords = Lists.newArrayList();
+        blockingQueue.drainTo(connectRecords, BATCH_POLL_SIZE);
+        return connectRecords;
+    }
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+
+    @Override
+    public void validate(KeyValue config) {
+        if 
(StringUtils.isBlank(config.getString(RocketMQConstant.ACCESS_KEY_ID))
+                || 
StringUtils.isBlank(config.getString(RocketMQConstant.ACCESS_KEY_SECRET))
+                || 
StringUtils.isBlank(config.getString(RocketMQConstant.NAMESRV_ADDR))
+                || 
StringUtils.isBlank(config.getString(RocketMQConstant.TOPIC))
+                || 
StringUtils.isBlank(config.getString(RocketMQConstant.CONSUMER_GROUP))) {
+            throw new RuntimeException("rocketmq required parameter is null 
!");
+        }
+        // 检查topic和consumer group是否存在
+        try {
+            Config onsConfig = new Config()
+                    
.setAccessKeyId(config.getString(RocketMQConstant.ACCESS_KEY_ID))
+                    
.setAccessKeySecret(config.getString(RocketMQConstant.ACCESS_KEY_SECRET));
+            onsConfig.endpoint = 
OnsUtils.parseEndpoint(config.getString(RocketMQConstant.NAMESRV_ADDR));
+            final Client client = new Client(onsConfig);
+            OnsTopicListRequest onsTopicListRequest = new OnsTopicListRequest()
+                    .setTopic(config.getString(RocketMQConstant.TOPIC))
+                    
.setInstanceId(config.getString(RocketMQConstant.INSTANCE_ID));
+            final OnsTopicListResponse onsTopicListResponse = 
client.onsTopicList(onsTopicListRequest);
+            if 
(onsTopicListResponse.getBody().getData().getPublishInfoDo().isEmpty()) {
+                throw new RuntimeException("rocketmq required parameter topic 
does not exist !");
+            }
+            OnsGroupListRequest onsGroupListRequest = new OnsGroupListRequest()
+                    
.setInstanceId(config.getString(RocketMQConstant.INSTANCE_ID))
+                    
.setGroupId(config.getString(RocketMQConstant.CONSUMER_GROUP));
+            final OnsGroupListResponse onsGroupListResponse = 
client.onsGroupList(onsGroupListRequest);
+            if 
(onsGroupListResponse.getBody().getData().getSubscribeInfoDo().isEmpty()) {
+                throw new RuntimeException("rocketmq required parameter 
consumerGroup does not exist !");
+            }
+        } catch (Exception e) {
+            log.error("RocketMQSinkTask | validate | error => ", e);
+            throw new RuntimeException(e.getMessage());
+        }
+    }
+
+    @Override
+    public void init(KeyValue config) {
+        accessKeyId = config.getString(RocketMQConstant.ACCESS_KEY_ID);
+        accessKeySecret = config.getString(RocketMQConstant.ACCESS_KEY_SECRET);
+        namesrvAddr = config.getString(RocketMQConstant.NAMESRV_ADDR);
+        topic = config.getString(RocketMQConstant.TOPIC);
+        instanceId = config.getString(RocketMQConstant.INSTANCE_ID);
+        consumerGroup = config.getString(RocketMQConstant.CONSUMER_GROUP);
+    }
+
+    @Override
+    public void start(SourceTaskContext sourceTaskContext) {
+        try {
+            super.start(sourceTaskContext);
+            initConsumer();
+            consumer.start();
+        } catch (Exception e) {
+            log.error("RocketMQSourceTask | start | error => ", e);
+            throw e;
+        }
+    }
+
+    private void initConsumer() {
+        try {
+            Properties properties = new Properties();
+            properties.put(PropertyKeyConst.GROUP_ID, consumerGroup);
+            properties.put(PropertyKeyConst.AccessKey, accessKeyId);
+            properties.put(PropertyKeyConst.SecretKey, accessKeySecret);
+            properties.put(PropertyKeyConst.NAMESRV_ADDR, namesrvAddr);
+            if (StringUtils.isNotBlank(instanceId)) {
+                properties.put(PropertyKeyConst.INSTANCE_ID, instanceId);
+            }
+            consumer = ONSFactory.createConsumer(properties);
+            // TODO TAG先忽略
+            consumer.subscribe(topic, "*", (message, consumeContext) -> {
+                try {
+                    Map<String, String> sourceRecordPartition = 
Maps.newHashMap();
+                    sourceRecordPartition.put("topic", message.getTopic());
+                    sourceRecordPartition.put("brokerName", 
message.getBornHost());
+                    Map<String, String> sourceRecordOffset = Maps.newHashMap();
+                    sourceRecordOffset.put("queueOffset", 
Long.toString(message.getOffset()));
+                    RecordPartition recordPartition = new 
RecordPartition(sourceRecordPartition);
+                    RecordOffset recordOffset = new 
RecordOffset(sourceRecordOffset);
+                    ConnectRecord connectRecord = new 
ConnectRecord(recordPartition, recordOffset, message.getBornTimestamp());
+                    connectRecord.setData(new String(message.getBody(), 
StandardCharsets.UTF_8));
+                    final Properties userProperties = 
message.getUserProperties();
+                    final Set<String> keys = 
userProperties.stringPropertyNames();
+                    keys.forEach(key -> connectRecord.addExtension(key, 
userProperties.get(key).toString()));
+                    final boolean offer = blockingQueue.offer(connectRecord, 
DEFAULT_CONSUMER_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+                    if (!offer) {
+                        return Action.ReconsumeLater;
+                    }
+                    return Action.CommitMessage;
+                } catch (Exception e) {
+                    log.error("RocketMQSourceTask | initConsumer | error => ", 
e);
+                    return Action.ReconsumeLater;
+                }
+            });
+        } catch (Exception e) {
+            log.error("RocketMQSourceTask | initConsumer | error => ", e);
+            throw e;
+        }
+    }
+
+    @Override
+    public void stop() {
+        consumer.shutdown();
+    }
+}
diff --git 
a/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/common/RocketMQConstant.java
 
b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/common/RocketMQConstant.java
new file mode 100644
index 0000000..cb1f352
--- /dev/null
+++ 
b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/common/RocketMQConstant.java
@@ -0,0 +1,17 @@
+package org.apache.rocketmq.connect.rocketmq.common;
+
+public class RocketMQConstant {
+
+    public static final String ACCESS_KEY_ID = "accessKeyId";
+
+    public static final String ACCESS_KEY_SECRET = "accessKeySecret";
+
+    public static final String NAMESRV_ADDR = "namesrvAddr";
+
+    public static final String TOPIC = "topic";
+
+    public static final String INSTANCE_ID = "instanceId";
+
+    public static final String CONSUMER_GROUP = "consumerGroup";
+
+}
diff --git 
a/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/utils/OnsUtils.java
 
b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/utils/OnsUtils.java
new file mode 100644
index 0000000..2a90f50
--- /dev/null
+++ 
b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/utils/OnsUtils.java
@@ -0,0 +1,9 @@
+package org.apache.rocketmq.connect.rocketmq.utils;
+
+public class OnsUtils {
+
+    public static String parseEndpoint(String namesrvAddr) {
+        final String[] split = namesrvAddr.split("\\.");
+        return "ons." + split[1] + ".aliyuncs.com";
+    }
+}
diff --git 
a/connectors/aliyun/rocketmq-connect-rocketmq/src/test/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkConnectorTest.java
 
b/connectors/aliyun/rocketmq-connect-rocketmq/src/test/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkConnectorTest.java
new file mode 100644
index 0000000..897bb09
--- /dev/null
+++ 
b/connectors/aliyun/rocketmq-connect-rocketmq/src/test/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkConnectorTest.java
@@ -0,0 +1,95 @@
+package org.apache.rocketmq.connect.rocketmq;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.connect.rocketmq.common.RocketMQConstant;
+import org.assertj.core.util.Lists;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class RocketMQSinkConnectorTest {
+
+    @Test
+    public void testTaskConfigs() {
+        RocketMQSinkConnector rocketMQSinkConnector= new 
RocketMQSinkConnector();
+        Assert.assertEquals(rocketMQSinkConnector.taskConfigs(1).size(), 1);
+    }
+
+    @Test
+    public void testPut() {
+        RocketMQSinkTask rocketMQSinkTask = new RocketMQSinkTask();
+        KeyValue keyValue = new DefaultKeyValue();
+        keyValue.put(RocketMQConstant.ACCESS_KEY_ID, "xxxx");
+        keyValue.put(RocketMQConstant.ACCESS_KEY_SECRET, "xxxx");
+        keyValue.put(RocketMQConstant.NAMESRV_ADDR, "xxxx");
+        keyValue.put(RocketMQConstant.TOPIC, "xxxx");
+        // 有实例Id需要填写实例Id
+        keyValue.put(RocketMQConstant.INSTANCE_ID, "xxxx");
+        rocketMQSinkTask.init(keyValue);
+        rocketMQSinkTask.start(new SinkTaskContext() {
+            @Override
+            public String getConnectorName() {
+                return null;
+            }
+
+            @Override
+            public String getTaskName() {
+                return null;
+            }
+
+            @Override
+            public void resetOffset(RecordPartition recordPartition, 
RecordOffset recordOffset) {
+
+            }
+
+            @Override
+            public void resetOffset(Map<RecordPartition, RecordOffset> 
offsets) {
+
+            }
+
+            @Override
+            public void pause(List<RecordPartition> partitions) {
+
+            }
+
+            @Override
+            public void resume(List<RecordPartition> partitions) {
+
+            }
+
+            @Override
+            public Set<RecordPartition> assignment() {
+                return null;
+            }
+        });
+        List<ConnectRecord> connectRecords = Lists.newArrayList();
+        ConnectRecord connectRecord = new ConnectRecord(new 
RecordPartition(new HashMap<>()), new RecordOffset(new HashMap<>()), 
System.currentTimeMillis());
+        connectRecord.setData("test message");
+        connectRecords.add(connectRecord);
+        connectRecord.addExtension("key", "value");
+        rocketMQSinkTask.put(connectRecords);
+    }
+
+    @Test
+    public void testValidate() {
+        RocketMQSinkTask rocketMQSinkTask = new RocketMQSinkTask();
+        KeyValue keyValue = new DefaultKeyValue();
+        keyValue.put(RocketMQConstant.ACCESS_KEY_ID, "xxxx");
+        keyValue.put(RocketMQConstant.ACCESS_KEY_SECRET, "xxxx");
+        keyValue.put(RocketMQConstant.NAMESRV_ADDR, "xxxx");
+        keyValue.put(RocketMQConstant.TOPIC, "xxxx");
+        // 有实例Id需要填写实例Id
+        keyValue.put(RocketMQConstant.INSTANCE_ID, "xxxx");
+        rocketMQSinkTask.validate(keyValue);
+    }
+
+}
diff --git 
a/connectors/aliyun/rocketmq-connect-rocketmq/src/test/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceConnectorTest.java
 
b/connectors/aliyun/rocketmq-connect-rocketmq/src/test/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceConnectorTest.java
new file mode 100644
index 0000000..bbdac55
--- /dev/null
+++ 
b/connectors/aliyun/rocketmq-connect-rocketmq/src/test/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceConnectorTest.java
@@ -0,0 +1,64 @@
+package org.apache.rocketmq.connect.rocketmq;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.source.SourceTaskContext;
+import io.openmessaging.connector.api.storage.OffsetStorageReader;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.connect.rocketmq.common.RocketMQConstant;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RocketMQSourceConnectorTest {
+
+    @Test
+    public void testTaskConfigs() {
+        RocketMQSourceConnector rocketMQSourceConnector = new 
RocketMQSourceConnector();
+        Assert.assertEquals(rocketMQSourceConnector.taskConfigs(1).size(), 1);
+    }
+
+    @Test
+    public void testPut() throws InterruptedException {
+        RocketMQSourceTask rocketMQSourceTask  = new RocketMQSourceTask();
+        KeyValue keyValue = new DefaultKeyValue();
+        keyValue.put(RocketMQConstant.ACCESS_KEY_ID, "xxxx");
+        keyValue.put(RocketMQConstant.ACCESS_KEY_SECRET, "xxxx");
+        keyValue.put(RocketMQConstant.NAMESRV_ADDR, "xxxx");
+        keyValue.put(RocketMQConstant.TOPIC, "xxxx");
+        // 有实例Id需要填写实例Id
+        keyValue.put(RocketMQConstant.INSTANCE_ID, "xxxx");
+        keyValue.put(RocketMQConstant.CONSUMER_GROUP, "xxxx");
+        rocketMQSourceTask.init(keyValue);
+        rocketMQSourceTask.start(new SourceTaskContext() {
+            @Override
+            public OffsetStorageReader offsetStorageReader() {
+                return null;
+            }
+
+            @Override
+            public String getConnectorName() {
+                return null;
+            }
+
+            @Override
+            public String getTaskName() {
+                return null;
+            }
+        });
+        rocketMQSourceTask.poll();
+        Thread.sleep(50000);
+    }
+
+    @Test
+    public void testValidate() {
+        RocketMQSourceTask rocketMQSourceTask = new RocketMQSourceTask();
+        KeyValue keyValue = new DefaultKeyValue();
+        keyValue.put(RocketMQConstant.ACCESS_KEY_ID, "xxxx");
+        keyValue.put(RocketMQConstant.ACCESS_KEY_SECRET, "xxxx");
+        keyValue.put(RocketMQConstant.NAMESRV_ADDR, "xxxx");
+        keyValue.put(RocketMQConstant.TOPIC, "xxxx");
+        // 有实例Id需要填写实例Id
+        keyValue.put(RocketMQConstant.INSTANCE_ID, "xxxx");
+        keyValue.put(RocketMQConstant.CONSUMER_GROUP, "xxxx");
+        rocketMQSourceTask.validate(keyValue);
+    }
+}

Reply via email to