This is an automated email from the ASF dual-hosted git repository.

sunxiaojian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ddf386  rocketmq-connect-kafka-connector-adapter 0.0.1-SNAPSHOT (#247)
5ddf386 is described below

commit 5ddf3862f55c720114b51a410c911faf14c8dcb6
Author: 欧夺标 <ouduob...@gmail.com>
AuthorDate: Fri Aug 19 14:49:46 2022 +0800

    rocketmq-connect-kafka-connector-adapter 0.0.1-SNAPSHOT (#247)
    
    Co-authored-by: ouduobiao <duobiao....@alibaba-inc.com>
---
 .../README.md                                      |  91 ++++++++++
 .../pom.xml                                        | 192 ++++++++++++++++++++
 .../connect/kafka/config/ConfigDefine.java         |  28 +++
 .../kafka/connector/KafkaRocketmqConnector.java    | 128 +++++++++++++
 .../connector/KafkaRocketmqSinkConnector.java      |  43 +++++
 .../kafka/connector/KafkaRocketmqSinkTask.java     | 199 +++++++++++++++++++++
 .../connector/KafkaRocketmqSourceConnector.java    |  43 +++++
 .../kafka/connector/KafkaRocketmqSourceTask.java   | 185 +++++++++++++++++++
 .../connector/RocketmqKafkaConnectorContext.java   |  21 +++
 .../connector/RocketmqKafkaSinkTaskContext.java    | 128 +++++++++++++
 .../connector/RocketmqKafkaSourceTaskContext.java  |  64 +++++++
 .../rocketmq/connect/kafka/util/ConfigUtil.java    |  34 ++++
 .../connect/kafka/util/KafkaPluginsUtil.java       |  25 +++
 .../rocketmq/connect/kafka/util/RecordUtil.java    | 157 ++++++++++++++++
 14 files changed, 1338 insertions(+)

diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/README.md 
b/connectors/rocketmq-connect-kafka-connector-adapter/README.md
new file mode 100644
index 0000000..01b7a63
--- /dev/null
+++ b/connectors/rocketmq-connect-kafka-connector-adapter/README.md
@@ -0,0 +1,91 @@
+**rocketmq-connect-kafka-connector-adapter**
+
+本项目的目标是让kafka connector运行在rocketmq-connect,使得数据在rocketmq导入导出。
+
+**参数说明**
+
+参数分为3类:rocketmq connect runtime参数、 kafka-connector-adapter参数,以及 具体kafka 
connector参数
+
+rocketmq connect runtime参数:
+- **connector-class**: kafka-connector-adapter的类名
+  
+  
如果是SourceConnector,对应为org.apache.rocketmq.connect.kafka.connector.KafkaRocketmqSourceConnector。
+  
+  
如果是SinkConnector,对应为org.apache.rocketmq.connect.kafka.connector.KafkaRocketmqSinkConnector。
+  
+- **connect-topicname**: 要导入导出数据的rocketmq topic
+- **tasks.num**: 启动的task数目 
+  
+kafka-connector-adapter参数:
+- **connector.class**: kafka connector的类名
+- **plugin.path**: kafka connector插件路径
+
+具体kafka connector参数:
+
+参考具体kafka connector的文档
+
+
+# 快速开始
+
+demo展示如何启动kafka-file-connector
+
+适配的kafka-file-connector的主要作用是从源文件中读取数据发送到RocketMQ集群 然后从Topic中读取消息,写入到目标文件
+
+## 1.获取kafka-file-connector
+
+1. 下载kafka的二进制包:https://kafka.apache.org/downloads
+2. 解压后到libs目录找到kafka-file-connector的jar包:connect-file-{version}.jar
+3. 将jar拷贝到专门目录,这个目录作为kafka connector插件路径:plugin.path,比如:/tmp/kafka-plugins
+
+
+## 2.构建rocketmq-connect-kafka-connector-adapter
+
+```
+git clone https://github.com/apache/rocketmq-connect.git
+
+cd  connectors/rocketmq-connect-kafka-connector-adapter/
+
+mvn package
+
+```
+最后将/target/rocketmq-connect-kafka-connector-adapter-0.0.1-SNAPSHOT-jar-with-dependencies.jar拷贝到rocketmq插件目录下,并修改connect-standalone.conf的pluginPaths为对应的rocketmq插件目录
+,比如/tmp/rocketmq-plugins
+
+## 3.运行Worker
+
+```
+cd 
distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT
+
+sh bin/connect-standalone.sh -c conf/connect-standalone.conf &
+
+```
+
+## 4.启动source connector
+
+```
+touch /tmp/test-source-file.txt
+
+echo "Hello \r\nRocketMQ\r\n Connect" >> /tmp/test-source-file.txt
+
+curl -X POST -H "Content-Type: application/json" 
http://127.0.0.1:8082/connectors/fileSourceConnector -d 
'{"connector-class":"org.apache.rocketmq.connect.kafka.connector.KafkaRocketmqSourceConnector","connect-topicname":"fileTopic","connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","plugin.path":"/tmp/kafka-plugins","topic":"fileTopic","file":"/tmp/test-source-file.txt"}'
+```
+
+## 5.启动sink connector
+
+```
+curl -X POST -H "Content-Type: application/json" 
http://127.0.0.1:8082/connectors/fileSinkConnector -d 
'{"connector-class":"org.apache.rocketmq.connect.kafka.connector.KafkaRocketmqSinkConnector","connect-topicname":"fileTopic","connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector","plugin.path":"/tmp/kafka-plugins","file":"/tmp/test-sink-file.txt"}'
+
+cat /tmp/test-sink-file.txt
+```
+
+# kafka connect transform
+
+todo
+
+# 如何运行kafka-mongo-connector
+
+todo
+
+# 如何运行kafka-jdbc-connector
+
+todo
\ No newline at end of file
diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/pom.xml 
b/connectors/rocketmq-connect-kafka-connector-adapter/pom.xml
new file mode 100644
index 0000000..5f62691
--- /dev/null
+++ b/connectors/rocketmq-connect-kafka-connector-adapter/pom.xml
@@ -0,0 +1,192 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.apache.rocketmq</groupId>
+    <artifactId>rocketmq-connect-kafka-connector-adapter</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+
+        <!-- Compiler settings properties -->
+        <maven.compiler.source>1.8</maven.compiler.source>
+        <maven.compiler.target>1.8</maven.compiler.target>
+    </properties>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>versions-maven-plugin</artifactId>
+                <version>2.3</version>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>clirr-maven-plugin</artifactId>
+                <version>2.7</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <configuration>
+                    
<outputDirectory>${project.build.directory}/lib</outputDirectory>
+                    <excludeTransitive>false</excludeTransitive>
+                    <stripVersion>true</stripVersion>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.6.1</version>
+                <configuration>
+                    <source>${maven.compiler.source}</source>
+                    <target>${maven.compiler.target}</target>
+                    <compilerVersion>${maven.compiler.source}</compilerVersion>
+                    <showDeprecation>true</showDeprecation>
+                    <showWarnings>true</showWarnings>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.19.1</version>
+                <configuration>
+                    <argLine>-Xms512m -Xmx1024m</argLine>
+                    <forkMode>always</forkMode>
+                    <includes>
+                        <include>**/*Test.java</include>
+                    </includes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-site-plugin</artifactId>
+                <version>3.6</version>
+                <configuration>
+                    <locales>en_US</locales>
+                    <outputEncoding>UTF-8</outputEncoding>
+                    <inputEncoding>UTF-8</inputEncoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-source-plugin</artifactId>
+                <version>3.0.1</version>
+                <executions>
+                    <execution>
+                        <id>attach-sources</id>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <version>0.12</version>
+                <configuration>
+                    <excludes>
+                        <exclude>README.md</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <version>2.10.4</version>
+                <configuration>
+                    <charset>UTF-8</charset>
+                    <locale>en_US</locale>
+                    
<excludePackageNames>io.openmessaging.internal</excludePackageNames>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>aggregate</id>
+                        <goals>
+                            <goal>aggregate</goal>
+                        </goals>
+                        <phase>site</phase>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-resources-plugin</artifactId>
+                <version>3.0.2</version>
+                <configuration>
+                    <encoding>${project.build.sourceEncoding}</encoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>findbugs-maven-plugin</artifactId>
+                <version>3.0.4</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>3.0.0</version>
+                <configuration>
+                    <archive>
+                        <!-- The Main Class Here doesn't make a lot sense 
since it was dynamically loaded-->
+                        <manifest>
+                            
<mainClass>org.apache.rocketmq.connect.hudi.connector.HudiSinkConnector</mainClass>
+                        </manifest>
+                    </archive>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+                <version>2.17</version>
+                <executions>
+                    <execution>
+                        <id>verify</id>
+                        <phase>verify</phase>
+                        <configuration>
+                            
<configLocation>style/rmq_checkstyle.xml</configLocation>
+                            <encoding>UTF-8</encoding>
+                            <consoleOutput>true</consoleOutput>
+                            <failsOnError>true</failsOnError>
+                            
<includeTestSourceDirectory>false</includeTestSourceDirectory>
+                            <includeTestResources>false</includeTestResources>
+                        </configuration>
+                        <goals>
+                            <goal>check</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>connect-runtime</artifactId>
+            <version>3.2.0</version>
+        </dependency>
+
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-connector</artifactId>
+            <version>0.1.4</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>1.7.7</version>
+        </dependency>
+
+    </dependencies>
+</project>
diff --git 
a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigDefine.java
 
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigDefine.java
new file mode 100644
index 0000000..9cbba08
--- /dev/null
+++ 
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigDefine.java
@@ -0,0 +1,28 @@
+package org.apache.rocketmq.connect.kafka.config;
+
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.TaskConfig;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class ConfigDefine {
+    public static String ROCKETMQ_CONNECTOR_CLASS = "connector-class";
+    public static String CONNECTOR_CLASS = 
ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+    public static String PLUGIN_PATH = "plugin.path";
+
+    public static final String TASK_CLASS = TaskConfig.TASK_CLASS_CONFIG;
+
+    public static final String KEY_CONVERTER = 
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG;
+    public static final String VALUE_CONVERTER = 
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG;
+    public static final String HEADER_CONVERTER = 
WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG;
+
+
+    public static final Set<String> REQUEST_CONFIG = new HashSet<String>(){
+        {
+            add(CONNECTOR_CLASS);
+            add(PLUGIN_PATH);
+        }
+    };
+}
diff --git 
a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqConnector.java
 
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqConnector.java
new file mode 100644
index 0000000..89bee2c
--- /dev/null
+++ 
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqConnector.java
@@ -0,0 +1,128 @@
+package org.apache.rocketmq.connect.kafka.connector;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.connector.Connector;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.rocketmq.connect.kafka.config.ConfigDefine;
+import org.apache.rocketmq.connect.kafka.util.ConfigUtil;
+import org.apache.rocketmq.connect.kafka.util.KafkaPluginsUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.DriverManager;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class KafkaRocketmqConnector extends Connector {
+    private static final Logger log = 
LoggerFactory.getLogger(KafkaRocketmqConnector.class);
+
+    private Connector childConnector;
+
+    private org.apache.kafka.connect.connector.Connector kafkaConnector;
+    private Plugins kafkaPlugins;
+    private Map<String, String> kafkaConnectorConfigs;
+
+    public KafkaRocketmqConnector(Connector childConnector) {
+        this.childConnector = childConnector;
+    }
+
+    @Override
+    public List<KeyValue> taskConfigs(int maxTasks) {
+        List<KeyValue> taskKeyValueConfigs = new ArrayList<>();
+        runWithWithConnectorLoader(() ->{
+            List<Map<String, String>> taskConfigs = 
this.kafkaConnector.taskConfigs(maxTasks);
+            taskKeyValueConfigs.addAll(
+                    taskConfigs
+                            .stream()
+                            .map(ConfigUtil::mapConfigToKeyValue)
+                            .collect(Collectors.toList())
+            );
+
+            taskKeyValueConfigs.forEach(kv -> {
+                kv.put(ConfigDefine.PLUGIN_PATH, 
this.kafkaConnectorConfigs.get(ConfigDefine.PLUGIN_PATH));
+                kv.put(ConfigDefine.CONNECTOR_CLASS, 
this.kafkaConnectorConfigs.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
+                kv.put(ConfigDefine.TASK_CLASS, 
this.kafkaConnector.taskClass().getName());
+
+                kv.put(ConfigDefine.ROCKETMQ_CONNECTOR_CLASS, 
childConnector.getClass().getName());
+
+                if( 
this.kafkaConnectorConfigs.containsKey(ConfigDefine.KEY_CONVERTER)){
+                    kv.put(ConfigDefine.KEY_CONVERTER, 
this.kafkaConnectorConfigs.get(ConfigDefine.KEY_CONVERTER));
+                }
+
+                if( 
this.kafkaConnectorConfigs.containsKey(ConfigDefine.VALUE_CONVERTER)){
+                    kv.put(ConfigDefine.VALUE_CONVERTER, 
this.kafkaConnectorConfigs.get(ConfigDefine.VALUE_CONVERTER));
+                }
+
+                if( 
this.kafkaConnectorConfigs.containsKey(ConfigDefine.HEADER_CONVERTER)){
+                    kv.put(ConfigDefine.HEADER_CONVERTER, 
this.kafkaConnectorConfigs.get(ConfigDefine.HEADER_CONVERTER));
+                }
+            });
+
+        });
+        return taskKeyValueConfigs;
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return this.childConnector instanceof SourceConnector
+                ? KafkaRocketmqSourceTask.class : KafkaRocketmqSinkTask.class;
+    }
+
+    @Override
+    public void start(KeyValue config) {
+        runWithWithConnectorLoader(() ->{
+            this.kafkaConnector.start(this.kafkaConnectorConfigs);
+        });
+    }
+
+    @Override
+    public void stop() {
+        runWithWithConnectorLoader(() ->{
+            this.kafkaConnector.stop();
+        });
+    }
+
+
+    @Override
+    public void validate(KeyValue config) {
+
+        for(String requestConfig: ConfigDefine.REQUEST_CONFIG){
+            if(!config.containsKey(requestConfig)){
+                throw new ConnectException("miss config:"+requestConfig);
+            }
+        }
+
+        this.kafkaConnectorConfigs = ConfigUtil.keyValueConfigToMap(config);
+        log.info("kafka connector config is {}", this.kafkaConnectorConfigs);
+        this.kafkaPlugins =  
KafkaPluginsUtil.getPlugins(Collections.singletonMap(KafkaPluginsUtil.PLUGIN_PATH,
 this.kafkaConnectorConfigs.get(ConfigDefine.PLUGIN_PATH)));
+        String connectorClassName = 
this.kafkaConnectorConfigs.get(ConfigDefine.CONNECTOR_CLASS);
+        ClassLoader connectorLoader = 
this.kafkaPlugins.delegatingLoader().connectorLoader(connectorClassName);
+        ClassLoader savedLoader = 
Plugins.compareAndSwapLoaders(connectorLoader);
+        try {
+            this.kafkaConnector =  
this.kafkaPlugins.newConnector(connectorClassName);
+            this.kafkaConnector.validate(this.kafkaConnectorConfigs);
+            this.kafkaConnector.initialize(
+                    new RocketmqKafkaConnectorContext(getConnectorContext())
+            );
+        } finally {
+            Plugins.compareAndSwapLoaders(savedLoader);
+        }
+
+    }
+
+    private void runWithWithConnectorLoader(Runnable runnable){
+        ClassLoader current = 
this.kafkaPlugins.compareAndSwapLoaders(this.kafkaConnector);
+        try {
+            runnable.run();
+        } finally {
+            Plugins.compareAndSwapLoaders(current);
+        }
+    }
+}
diff --git 
a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSinkConnector.java
 
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSinkConnector.java
new file mode 100644
index 0000000..9cec38e
--- /dev/null
+++ 
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSinkConnector.java
@@ -0,0 +1,43 @@
+package org.apache.rocketmq.connect.kafka.connector;
+
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.sink.SinkConnector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class KafkaRocketmqSinkConnector extends SinkConnector {
+
+    private static final Logger log = 
LoggerFactory.getLogger(KafkaRocketmqSinkConnector.class);
+
+    private KafkaRocketmqConnector kafkaRocketmqConnector = new 
KafkaRocketmqConnector(this);
+
+    @Override
+    public List<KeyValue> taskConfigs(int maxTasks) {
+        return kafkaRocketmqConnector.taskConfigs(maxTasks);
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return kafkaRocketmqConnector.taskClass();
+    }
+
+    @Override
+    public void start(KeyValue config) {
+        kafkaRocketmqConnector.start(config);
+    }
+
+    @Override
+    public void stop() {
+        kafkaRocketmqConnector.stop();
+    }
+
+    @Override
+    public void validate(KeyValue config) {
+        kafkaRocketmqConnector.validate(config);
+    }
+
+}
diff --git 
a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSinkTask.java
 
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSinkTask.java
new file mode 100644
index 0000000..c2b6cfa
--- /dev/null
+++ 
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSinkTask.java
@@ -0,0 +1,199 @@
+package org.apache.rocketmq.connect.kafka.connector;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.errors.ConnectException;
+import io.openmessaging.connector.api.errors.RetriableException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.header.ConnectHeaders;
+import org.apache.kafka.connect.runtime.TaskConfig;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+import org.apache.rocketmq.connect.kafka.config.ConfigDefine;
+import org.apache.rocketmq.connect.kafka.util.ConfigUtil;
+import org.apache.rocketmq.connect.kafka.util.KafkaPluginsUtil;
+import org.apache.rocketmq.connect.kafka.util.RecordUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+
+
+public class KafkaRocketmqSinkTask extends SinkTask {
+    private static final Logger log = 
LoggerFactory.getLogger(KafkaRocketmqSinkTask.class);
+
+    private org.apache.kafka.connect.sink.SinkTask kafkaSinkTask;
+    private ClassLoader classLoader;
+
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private HeaderConverter headerConverter;
+
+    @Override
+    public void put(List<ConnectRecord> sinkRecords) throws ConnectException {
+        Collection<SinkRecord> records = new ArrayList<>(sinkRecords.size());
+        for(ConnectRecord sinkRecord: sinkRecords){
+            String topic = 
(String)sinkRecord.getPosition().getPartition().getPartition().get(RecordUtil.TOPIC);
+            SchemaAndValue valueSchemaAndValue = 
valueConverter.toConnectData(topic, 
((String)sinkRecord.getData()).getBytes(StandardCharsets.UTF_8));
+            String key = sinkRecord.getExtension(RecordUtil.KAFKA_MSG_KEY);
+            SchemaAndValue keySchemaAndValue = null;
+            if(key != null) {
+                keySchemaAndValue = keyConverter.toConnectData(topic, 
key.getBytes(StandardCharsets.UTF_8));
+            }
+
+            SinkRecord record = new SinkRecord(
+                    
RecordUtil.getTopicAndBrokerName(sinkRecord.getPosition().getPartition()),
+                    
RecordUtil.getPartition(sinkRecord.getPosition().getPartition()),
+                    keySchemaAndValue==null?null:keySchemaAndValue.schema(),
+                    keySchemaAndValue==null?null:keySchemaAndValue.value(),
+                    valueSchemaAndValue.schema(), valueSchemaAndValue.value(),
+                    RecordUtil.getOffset(sinkRecord.getPosition().getOffset()),
+                    sinkRecord.getTimestamp(), TimestampType.NO_TIMESTAMP_TYPE,
+                    getHeaders(sinkRecord.getExtensions(),  topic)
+                    );
+            records.add(record);
+        }
+        try {
+            this.kafkaSinkTask.put(records);
+        } catch (org.apache.kafka.connect.errors.RetriableException e){
+            throw new RetriableException(e);
+        }
+    }
+
+    private ConnectHeaders getHeaders(KeyValue extensions, String topic){
+        ConnectHeaders headers = new ConnectHeaders();
+        for(String headerKey: extensions.keySet()){
+            if(RecordUtil.KAFKA_MSG_KEY.equals(headerKey)){
+                continue;
+            }
+            SchemaAndValue headerSchemaAndValue = headerConverter
+                    .toConnectHeader(topic, headerKey, 
extensions.getString(headerKey).getBytes());
+            headers.add(headerKey, headerSchemaAndValue);
+        }
+        return headers;
+    }
+
+
+
+    @Override
+    public void start(KeyValue config) {
+        Map<String, String> kafkaTaskProps = 
ConfigUtil.keyValueConfigToMap(config);
+        log.info("kafka connector task config is {}", kafkaTaskProps);
+        Plugins kafkaPlugins =  
KafkaPluginsUtil.getPlugins(Collections.singletonMap(KafkaPluginsUtil.PLUGIN_PATH,
 kafkaTaskProps.get(ConfigDefine.PLUGIN_PATH)));
+        String connectorClass = 
kafkaTaskProps.get(ConfigDefine.CONNECTOR_CLASS);
+        ClassLoader connectorLoader = 
kafkaPlugins.delegatingLoader().connectorLoader(connectorClass);
+        this.classLoader = Plugins.compareAndSwapLoaders(connectorLoader);
+        try {
+            TaskConfig taskConfig = new TaskConfig(kafkaTaskProps);
+            Class<? extends Task> taskClass = 
taskConfig.getClass(ConfigDefine.TASK_CLASS).asSubclass(Task.class);
+            this.kafkaSinkTask = 
(org.apache.kafka.connect.sink.SinkTask)kafkaPlugins.newTask(taskClass);
+            initConverter(kafkaPlugins, kafkaTaskProps);
+            this.kafkaSinkTask.initialize(new 
RocketmqKafkaSinkTaskContext(sinkTaskContext));
+            this.kafkaSinkTask.start(kafkaTaskProps);
+        } catch (Throwable e){
+            recoverClassLoader();
+            throw e;
+        }
+    }
+
+    @Override
+    public void stop() {
+        try {
+            this.kafkaSinkTask.stop();
+        } finally {
+            recoverClassLoader();
+        }
+    }
+
+
+    private void recoverClassLoader(){
+        if(this.classLoader != null){
+            Plugins.compareAndSwapLoaders(this.classLoader);
+            this.classLoader = null;
+        }
+    }
+
+    private void initConverter(Plugins plugins, Map<String, String> taskProps){
+
+        ConfigDef converterConfigDef = new ConfigDef()
+                .define(ConfigDefine.KEY_CONVERTER, ConfigDef.Type.CLASS, 
null, ConfigDef.Importance.LOW, "")
+                .define(ConfigDefine.VALUE_CONVERTER, ConfigDef.Type.CLASS, 
null, ConfigDef.Importance.LOW, "")
+                .define(ConfigDefine.HEADER_CONVERTER, ConfigDef.Type.CLASS, 
null, ConfigDef.Importance.LOW, "");
+
+        Map<String, String> connProps = new HashMap<>();
+        if(taskProps.containsKey(ConfigDefine.KEY_CONVERTER)){
+            connProps.put(ConfigDefine.KEY_CONVERTER, 
taskProps.get(ConfigDefine.KEY_CONVERTER));
+        }
+        if(taskProps.containsKey(ConfigDefine.VALUE_CONVERTER)){
+            connProps.put(ConfigDefine.VALUE_CONVERTER, 
taskProps.get(ConfigDefine.VALUE_CONVERTER));
+        }
+        if(taskProps.containsKey(ConfigDefine.HEADER_CONVERTER)){
+            connProps.put(ConfigDefine.HEADER_CONVERTER, 
taskProps.get(ConfigDefine.HEADER_CONVERTER));
+        }
+        SimpleConfig connConfig = new SimpleConfig(converterConfigDef, 
connProps);
+
+        Map<String, String> workerProps = new HashMap<>();
+        workerProps.put(ConfigDefine.KEY_CONVERTER, 
"org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put(ConfigDefine.VALUE_CONVERTER, 
"org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put(ConfigDefine.HEADER_CONVERTER, 
"org.apache.kafka.connect.storage.SimpleHeaderConverter");
+        SimpleConfig workerConfig = new SimpleConfig(converterConfigDef, 
workerProps);
+
+        keyConverter = plugins.newConverter(connConfig, 
ConfigDefine.KEY_CONVERTER, Plugins.ClassLoaderUsage
+                .CURRENT_CLASSLOADER);
+        valueConverter = plugins.newConverter(connConfig, 
ConfigDefine.VALUE_CONVERTER, Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER);
+        headerConverter = plugins.newHeaderConverter(connConfig, 
ConfigDefine.HEADER_CONVERTER,
+                Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER);
+
+        if (keyConverter == null) {
+            keyConverter = plugins.newConverter(workerConfig, 
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Plugins.ClassLoaderUsage.PLUGINS);
+            log.info("Set up the key converter {} for task {} using the worker 
config", keyConverter.getClass(), sinkTaskContext.getTaskName());
+        } else {
+            log.info("Set up the key converter {} for task {} using the 
connector config", keyConverter.getClass(), sinkTaskContext.getTaskName());
+        }
+        if (valueConverter == null) {
+            valueConverter = plugins.newConverter(workerConfig, 
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Plugins.ClassLoaderUsage.PLUGINS);
+            log.info("Set up the value converter {} for task {} using the 
worker config", valueConverter.getClass(), sinkTaskContext.getTaskName());
+        } else {
+            log.info("Set up the value converter {} for task {} using the 
connector config", valueConverter.getClass(), sinkTaskContext.getTaskName());
+        }
+        if (headerConverter == null) {
+            headerConverter = plugins.newHeaderConverter(workerConfig, 
WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, Plugins.ClassLoaderUsage
+                    .PLUGINS);
+            log.info("Set up the header converter {} for task {} using the 
worker config", headerConverter.getClass(), sinkTaskContext.getTaskName());
+        } else {
+            log.info("Set up the header converter {} for task {} using the 
connector config", headerConverter.getClass(), sinkTaskContext.getTaskName());
+        }
+    }
+
+    @Override
+    public void flush(Map<RecordPartition, RecordOffset> currentOffsets) 
throws ConnectException {
+
+        if(this.kafkaSinkTask == null){
+            log.warn("the task is not start, currentOffsets:{}", 
currentOffsets);
+            return;
+        }
+
+        Map<TopicPartition, OffsetAndMetadata> offsets = new 
HashMap<>(currentOffsets.size());
+
+        for(Map.Entry<RecordPartition, RecordOffset> po: 
currentOffsets.entrySet()){
+            TopicPartition tp = 
RecordUtil.recordPartitionToTopicPartition(po.getKey());
+            OffsetAndMetadata offsetAndMetadata = new 
OffsetAndMetadata(RecordUtil.getOffset(po.getValue()));
+            offsets.put(tp, offsetAndMetadata);
+        }
+        this.kafkaSinkTask.flush(offsets);
+    }
+}
diff --git 
a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSourceConnector.java
 
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSourceConnector.java
new file mode 100644
index 0000000..6f6e006
--- /dev/null
+++ 
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSourceConnector.java
@@ -0,0 +1,43 @@
+package org.apache.rocketmq.connect.kafka.connector;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
+
+import java.util.List;
+
+
+public class KafkaRocketmqSourceConnector extends SourceConnector {
+
+    private static final Logger log = 
LoggerFactory.getLogger(KafkaRocketmqSourceConnector.class);
+
+    private KafkaRocketmqConnector kafkaRocketmqConnector = new 
KafkaRocketmqConnector(this);
+
+    @Override
+    public List<KeyValue> taskConfigs(int maxTasks) {
+        return kafkaRocketmqConnector.taskConfigs(maxTasks);
+    }
+    
+    @Override
+    public Class<? extends Task> taskClass() {
+        return kafkaRocketmqConnector.taskClass();
+    }
+
+    @Override
+    public void start(KeyValue config) {
+        kafkaRocketmqConnector.start(config);
+    }
+
+    @Override
+    public void stop() {
+        kafkaRocketmqConnector.stop();
+    }
+
+    @Override
+    public void validate(KeyValue config) {
+        kafkaRocketmqConnector.validate(config);
+    }
+
+}
diff --git 
a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSourceTask.java
 
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSourceTask.java
new file mode 100644
index 0000000..3db8251
--- /dev/null
+++ 
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSourceTask.java
@@ -0,0 +1,185 @@
+package org.apache.rocketmq.connect.kafka.connector;
+
+import io.openmessaging.connector.api.data.*;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.runtime.TaskConfig;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+import org.apache.rocketmq.connect.kafka.util.KafkaPluginsUtil;
+import org.apache.rocketmq.connect.kafka.util.RecordUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.rocketmq.connect.kafka.config.ConfigDefine;
+import org.apache.rocketmq.connect.kafka.util.ConfigUtil;
+
+import java.util.*;
+
+public class KafkaRocketmqSourceTask extends SourceTask {
+    private static final Logger log = 
LoggerFactory.getLogger(KafkaRocketmqSourceTask.class);
+
+    private org.apache.kafka.connect.source.SourceTask kafkaSourceTask;
+
+    private ClassLoader classLoader;
+
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private HeaderConverter headerConverter;
+
+
+    @Override
+    public List<ConnectRecord> poll() throws InterruptedException {
+
+        List<SourceRecord>  sourceRecords =  this.kafkaSourceTask.poll();
+
+        if(sourceRecords == null){
+            return  null;
+        }
+
+        List<ConnectRecord> connectRecords = new 
ArrayList<>(sourceRecords.size());
+        for(SourceRecord sourceRecord: sourceRecords){
+            connectRecords.add(RecordUtil.toConnectRecord(sourceRecord,
+                    this.keyConverter, this.valueConverter, 
this.headerConverter));
+        }
+        return connectRecords;
+    }
+
+    @Override
+    public void start(KeyValue config) {
+        Map<String, String> kafkaTaskProps = 
ConfigUtil.keyValueConfigToMap(config);
+        log.info("kafka connector task config is {}", kafkaTaskProps);
+        Plugins kafkaPlugins = 
KafkaPluginsUtil.getPlugins(Collections.singletonMap(KafkaPluginsUtil.PLUGIN_PATH,
 kafkaTaskProps.get(ConfigDefine.PLUGIN_PATH)));
+        String connectorClass = 
kafkaTaskProps.get(ConfigDefine.CONNECTOR_CLASS);
+        ClassLoader connectorLoader = 
kafkaPlugins.delegatingLoader().connectorLoader(connectorClass);
+        this.classLoader = Plugins.compareAndSwapLoaders(connectorLoader);
+        try {
+
+            TaskConfig taskConfig = new TaskConfig(kafkaTaskProps);
+            Class<? extends Task> taskClass = 
taskConfig.getClass(ConfigDefine.TASK_CLASS).asSubclass(Task.class);
+            this.kafkaSourceTask = 
(org.apache.kafka.connect.source.SourceTask)kafkaPlugins.newTask(taskClass);
+
+            initConverter(kafkaPlugins, kafkaTaskProps);
+
+            this.kafkaSourceTask.initialize(new 
RocketmqKafkaSourceTaskContext(sourceTaskContext));
+            this.kafkaSourceTask.start(kafkaTaskProps);
+        } catch (Throwable e){
+            recoverClassLoader();
+            throw e;
+        }
+    }
+
+    private void initConverter(Plugins plugins, Map<String, String> taskProps){
+
+        ConfigDef converterConfigDef = new ConfigDef()
+                .define(ConfigDefine.KEY_CONVERTER, ConfigDef.Type.CLASS, 
null, ConfigDef.Importance.LOW, "")
+                .define(ConfigDefine.VALUE_CONVERTER, ConfigDef.Type.CLASS, 
null, ConfigDef.Importance.LOW, "")
+                .define(ConfigDefine.HEADER_CONVERTER, ConfigDef.Type.CLASS, 
null, ConfigDef.Importance.LOW, "");
+
+        Map<String, String> connProps = new HashMap<>();
+        if(taskProps.containsKey(ConfigDefine.KEY_CONVERTER)){
+            connProps.put(ConfigDefine.KEY_CONVERTER, 
taskProps.get(ConfigDefine.KEY_CONVERTER));
+        }
+        if(taskProps.containsKey(ConfigDefine.VALUE_CONVERTER)){
+            connProps.put(ConfigDefine.VALUE_CONVERTER, 
taskProps.get(ConfigDefine.VALUE_CONVERTER));
+        }
+        if(taskProps.containsKey(ConfigDefine.HEADER_CONVERTER)){
+            connProps.put(ConfigDefine.HEADER_CONVERTER, 
taskProps.get(ConfigDefine.HEADER_CONVERTER));
+        }
+        SimpleConfig connConfig = new SimpleConfig(converterConfigDef, 
connProps);
+
+        Map<String, String> workerProps = new HashMap<>();
+        workerProps.put(ConfigDefine.KEY_CONVERTER, 
"org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put(ConfigDefine.VALUE_CONVERTER, 
"org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put(ConfigDefine.HEADER_CONVERTER, 
"org.apache.kafka.connect.storage.SimpleHeaderConverter");
+        SimpleConfig workerConfig = new SimpleConfig(converterConfigDef, 
workerProps);
+
+        keyConverter = plugins.newConverter(connConfig, 
ConfigDefine.KEY_CONVERTER, Plugins.ClassLoaderUsage
+                .CURRENT_CLASSLOADER);
+        valueConverter = plugins.newConverter(connConfig, 
ConfigDefine.VALUE_CONVERTER, Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER);
+        headerConverter = plugins.newHeaderConverter(connConfig, 
ConfigDefine.HEADER_CONVERTER,
+                Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER);
+
+        if (keyConverter == null) {
+            keyConverter = plugins.newConverter(workerConfig, 
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Plugins.ClassLoaderUsage.PLUGINS);
+            log.info("Set up the key converter {} for task {} using the worker 
config", keyConverter.getClass(), sourceTaskContext.getTaskName());
+        } else {
+            log.info("Set up the key converter {} for task {} using the 
connector config", keyConverter.getClass(), sourceTaskContext.getTaskName());
+        }
+        if (valueConverter == null) {
+            valueConverter = plugins.newConverter(workerConfig, 
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Plugins.ClassLoaderUsage.PLUGINS);
+            log.info("Set up the value converter {} for task {} using the 
worker config", valueConverter.getClass(), sourceTaskContext.getTaskName());
+        } else {
+            log.info("Set up the value converter {} for task {} using the 
connector config", valueConverter.getClass(), sourceTaskContext.getTaskName());
+        }
+        if (headerConverter == null) {
+            headerConverter = plugins.newHeaderConverter(workerConfig, 
WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, Plugins.ClassLoaderUsage
+                    .PLUGINS);
+            log.info("Set up the header converter {} for task {} using the 
worker config", headerConverter.getClass(), sourceTaskContext.getTaskName());
+        } else {
+            log.info("Set up the header converter {} for task {} using the 
connector config", headerConverter.getClass(), sourceTaskContext.getTaskName());
+        }
+    }
+
+    @Override
+    public void stop() {
+        try {
+            this.kafkaSourceTask.stop();
+        } finally {
+            recoverClassLoader();
+        }
+    }
+
+    private void recoverClassLoader(){
+        if(this.classLoader != null){
+            Plugins.compareAndSwapLoaders(this.classLoader);
+            this.classLoader = null;
+        }
+    }
+
+
+    @Override
+    public void commit(ConnectRecord record, Map<String, String> metadata) {
+
+        if(this.kafkaSourceTask == null){
+            log.warn("the task is not start, metadata:{}", metadata);
+            return;
+        }
+        
+        try {
+            long baseOffset = 
Long.valueOf(metadata.get(RecordUtil.QUEUE_OFFSET));
+            TopicPartition topicPartition = new 
TopicPartition(metadata.get(RecordUtil.TOPIC), 
Integer.valueOf(metadata.get(RecordUtil.QUEUE_ID)));
+            RecordMetadata recordMetadata = new RecordMetadata(
+                    topicPartition, baseOffset, 0,
+                    System.currentTimeMillis(), 0,0
+            );
+            this.kafkaSourceTask.commitRecord(
+                    RecordUtil.toSourceRecord(record, this.keyConverter, 
this.valueConverter, this.headerConverter),
+                    recordMetadata
+            );
+        } catch (InterruptedException e){
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    @Override
+    public void commit() {
+        if(this.kafkaSourceTask == null){
+            log.warn("the task is not start");
+            return;
+        }
+
+        try {
+            this.kafkaSourceTask.commit();
+        } catch (InterruptedException e){
+            Thread.currentThread().interrupt();
+        }
+    }
+}
diff --git 
a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/RocketmqKafkaConnectorContext.java
 
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/RocketmqKafkaConnectorContext.java
new file mode 100644
index 0000000..ea20b81
--- /dev/null
+++ 
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/RocketmqKafkaConnectorContext.java
@@ -0,0 +1,21 @@
+package org.apache.rocketmq.connect.kafka.connector;
+
+import io.openmessaging.connector.api.component.connector.ConnectorContext;
+
+public class RocketmqKafkaConnectorContext implements 
org.apache.kafka.connect.connector.ConnectorContext{
+    protected ConnectorContext rocketMqConnectorContext;
+
+    public RocketmqKafkaConnectorContext(ConnectorContext 
rocketMqConnectorContext) {
+        this.rocketMqConnectorContext = rocketMqConnectorContext;
+    }
+
+    @Override
+    public void requestTaskReconfiguration() {
+        this.rocketMqConnectorContext.requestTaskReconfiguration();
+    }
+
+    @Override
+    public void raiseError(Exception e) {
+        this.rocketMqConnectorContext.raiseError(e);
+    }
+}
diff --git 
a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/RocketmqKafkaSinkTaskContext.java
 
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/RocketmqKafkaSinkTaskContext.java
new file mode 100644
index 0000000..848032b
--- /dev/null
+++ 
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/RocketmqKafkaSinkTaskContext.java
@@ -0,0 +1,128 @@
+package org.apache.rocketmq.connect.kafka.connector;
+
+
+import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
+import io.openmessaging.connector.api.data.*;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.rocketmq.connect.kafka.util.ConfigUtil;
+import org.apache.rocketmq.connect.kafka.util.RecordUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.stream.Collectors;
+
+
+public class RocketmqKafkaSinkTaskContext implements 
org.apache.kafka.connect.sink.SinkTaskContext {
+
+    private static final Logger log = 
LoggerFactory.getLogger(RocketmqKafkaSinkTaskContext.class);
+
+    private static final ExecutorService EXECUTOR_SERVICE  = 
Executors.newFixedThreadPool(1);
+
+    private SinkTaskContext sinkTaskContext;
+
+    public RocketmqKafkaSinkTaskContext(SinkTaskContext sinkTaskContext) {
+        this.sinkTaskContext = sinkTaskContext;
+    }
+
+    @Override
+    public Map<String, String> configs() {
+        return ConfigUtil.keyValueConfigToMap(sinkTaskContext.configs());
+    }
+
+    @Override
+    public void offset(Map<TopicPartition, Long> offsets) {
+
+        Map<RecordPartition, RecordOffset> offsets2 = new 
HashMap<>(offsets.size());
+        offsets.forEach((tp,offset) -> {
+            Map<String, String> map = RecordUtil.getPartitionMap(tp.topic());
+            map.put(RecordUtil.QUEUE_ID, tp.partition() + "");
+            RecordPartition recordPartition = new RecordPartition(map);
+
+            Map<String, String> offsetMap = new HashMap<>();
+            offsetMap.put(RecordUtil.QUEUE_OFFSET, offset + "");
+            RecordOffset recordOffset = new RecordOffset(offsetMap);
+
+            offsets2.put(recordPartition, recordOffset);
+        });
+        sinkTaskContext.resetOffset(offsets2);
+    }
+
+    @Override
+    public void offset(TopicPartition tp, long offset) {
+        this.offset(Collections.singletonMap(tp, offset));
+    }
+
+    @Override
+    public void timeout(long timeoutMs) {
+        log.info("ignore timeout because not impl, timeoutMs:{}", timeoutMs);
+    }
+
+    @Override
+    public Set<TopicPartition> assignment() {
+        return sinkTaskContext.assignment()
+                .stream()
+                .map(RecordUtil::recordPartitionToTopicPartition)
+                .collect(Collectors.toSet());
+    }
+
+    @Override
+    public void pause(TopicPartition... partitions) {
+        sinkTaskContext.pause(
+                toRecordPartitions(partitions)
+        );
+    }
+
+    @Override
+    public void resume(TopicPartition... partitions) {
+        sinkTaskContext.resume(
+                toRecordPartitions(partitions)
+        );
+    }
+
+    private List<RecordPartition> toRecordPartitions(TopicPartition... 
partitions){
+        return Arrays.stream(partitions)
+                .map(RecordUtil::topicPartitionToRecordPartition)
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public void requestCommit() {
+        log.info("ignore requestCommit because not impl");
+    }
+
+
+    @Override
+    public ErrantRecordReporter errantRecordReporter() {
+        return new ErrantRecordReporter() {
+            @Override
+            public Future<Void> report(SinkRecord record, Throwable error) {
+
+                 return EXECUTOR_SERVICE.submit(new Callable<Void>() {
+                     @Override
+                     public Void call() throws Exception {
+
+                         Map<String, String> partitionMap = 
RecordUtil.getPartitionMap(record.topic());
+                         partitionMap.put(RecordUtil.QUEUE_ID, 
record.kafkaPartition() + "");
+                         RecordPartition recordPartition = new 
RecordPartition(partitionMap);
+
+                         Map<String, String> offsetMap = new HashMap<>();
+                         offsetMap.put(RecordUtil.QUEUE_OFFSET, 
record.kafkaOffset() + "");
+                         RecordOffset recordOffset = new 
RecordOffset(offsetMap);
+
+                         ConnectRecord connectRecord = new ConnectRecord(
+                                 recordPartition, recordOffset, 
record.timestamp(),
+                                 SchemaBuilder.string().build(), record.value()
+                                 );
+                         
sinkTaskContext.errorRecordReporter().report(connectRecord, error);
+                         return null;
+                     }
+                 });
+
+            }
+        };
+    }
+}
diff --git 
a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/RocketmqKafkaSourceTaskContext.java
 
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/RocketmqKafkaSourceTaskContext.java
new file mode 100644
index 0000000..c30294a
--- /dev/null
+++ 
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/RocketmqKafkaSourceTaskContext.java
@@ -0,0 +1,64 @@
+package org.apache.rocketmq.connect.kafka.connector;
+
+
+import io.openmessaging.connector.api.component.task.source.SourceTaskContext;
+import io.openmessaging.connector.api.data.RecordPartition;
+import org.apache.kafka.connect.storage.OffsetStorageReader;
+import org.apache.rocketmq.connect.kafka.util.ConfigUtil;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class RocketmqKafkaSourceTaskContext implements 
org.apache.kafka.connect.source.SourceTaskContext {
+
+    private SourceTaskContext sourceTaskContext;
+
+    public RocketmqKafkaSourceTaskContext(SourceTaskContext sourceTaskContext) 
{
+        this.sourceTaskContext = sourceTaskContext;
+    }
+
+    @Override
+    public Map<String, String> configs() {
+        return ConfigUtil.keyValueConfigToMap(sourceTaskContext.configs());
+    }
+
+    @Override
+    public OffsetStorageReader offsetStorageReader() {
+        return new OffsetStorageReader(){
+            @Override
+            public <T> Map<String, Object> offset(Map<String, T> partition) {
+                return 
offsets(Collections.singletonList(partition)).get(partition);
+            }
+
+            @Override
+            public <T> Map<Map<String, T>, Map<String, Object>> 
offsets(Collection<Map<String, T>> partitions) {
+
+                Collection<RecordPartition> rocketmqPartitions = 
partitions.stream()
+                        .map(RecordPartition::new)
+                        .collect(Collectors.toList());
+
+                Map<Map<String, T>, Map<String, Object>> results = new 
HashMap<>(partitions.size());
+                sourceTaskContext
+                        .offsetStorageReader()
+                        .readOffsets(rocketmqPartitions)
+                        .forEach((p,o) -> {
+                            results.put((Map<String, T>)p.getPartition(), 
mayConvertToLongOffset(o.getOffset()));
+                        });
+                return results;
+            }
+
+            // kafka的offset是long表示,被序列化再反序列化会是int
+            private Map<String, Object> mayConvertToLongOffset(Map<String, ?> 
offset){
+                Map<String, Object> result = new HashMap<>(offset.size());
+                for(Map.Entry<String, ?> kv: offset.entrySet()){
+                    Object v = kv.getValue();
+                    result.put(kv.getKey(), v instanceof Integer ?  ((Integer) 
v).longValue():v);
+                }
+                return result;
+            }
+        };
+    }
+}
diff --git 
a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/ConfigUtil.java
 
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/ConfigUtil.java
new file mode 100644
index 0000000..2113fcb
--- /dev/null
+++ 
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/ConfigUtil.java
@@ -0,0 +1,34 @@
+package org.apache.rocketmq.connect.kafka.util;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.internal.DefaultKeyValue;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class ConfigUtil {
+
+    public static Map<String, String> keyValueConfigToMap(KeyValue 
keyValueConfig){
+        if(keyValueConfig == null){
+            return null;
+        }
+
+        Set<String> configKeySet = keyValueConfig.keySet();
+        Map<String, String> mapConfig = new HashMap<>(configKeySet.size());
+        configKeySet.forEach(key -> mapConfig.put(key, 
keyValueConfig.getString(key)));
+        return mapConfig;
+    }
+
+
+    public static KeyValue mapConfigToKeyValue(Map<String, String> mapConfig){
+        if(mapConfig == null){
+            return null;
+        }
+
+        KeyValue keyValue = new DefaultKeyValue();
+        mapConfig.forEach((k, v)-> keyValue.put(k, v));
+
+        return keyValue;
+    }
+}
diff --git 
a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/KafkaPluginsUtil.java
 
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/KafkaPluginsUtil.java
new file mode 100644
index 0000000..65a324d
--- /dev/null
+++ 
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/KafkaPluginsUtil.java
@@ -0,0 +1,25 @@
+package org.apache.rocketmq.connect.kafka.util;
+
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class KafkaPluginsUtil {
+
+
+    public static final String PLUGIN_PATH = "plugin.path";
+    private static final Map<String, Plugins> CACHE = new HashMap<>();
+
+    public static Plugins getPlugins(Map<String, String> props){
+        String path =  props.get(PLUGIN_PATH);
+        synchronized (CACHE){
+            Plugins plugins = CACHE.get(path);
+            if(plugins  == null){
+                plugins = new Plugins(props);
+                CACHE.put(path, plugins);
+            }
+            return plugins;
+        }
+    }
+}
diff --git 
a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/RecordUtil.java
 
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/RecordUtil.java
new file mode 100644
index 0000000..815ae56
--- /dev/null
+++ 
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/RecordUtil.java
@@ -0,0 +1,157 @@
+package org.apache.rocketmq.connect.kafka.util;
+
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.header.ConnectHeaders;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+public class RecordUtil {
+
+    public static final String BROKER_NAME = "brokerName";
+    public static final String QUEUE_ID = "queueId";
+    public static final String TOPIC = "topic";
+    public static final String QUEUE_OFFSET = "queueOffset";
+
+
+    private static final String TOPIC_SEP = "@#@";
+
+
+    public static final String KAFKA_MSG_KEY = "kafka_key";
+    public static final String KAFKA_CONNECT_RECORD_TOPIC_KEY = 
"kafka_connect_record_topic";
+    public static final String KAFKA_CONNECT_RECORD_PARTITION_KEY = 
"kafka_connect_record_partition";
+    public static final String KAFKA_CONNECT_RECORD_HEADER_KEY_PREFIX = 
"kafka_connect_record_header_";
+
+    public static String getTopicAndBrokerName(RecordPartition 
recordPartition) {
+        return new StringBuilder()
+                .append(recordPartition.getPartition().get(TOPIC))
+                .append(TOPIC_SEP)
+                .append(recordPartition.getPartition().get(BROKER_NAME))
+                .toString();
+    }
+
+    public static Map<String, String>  getPartitionMap(String 
topicAndBrokerName) {
+        String[] split = topicAndBrokerName.split(TOPIC_SEP);
+        Map<String, String> map = new HashMap<>();
+        map.put(TOPIC, split[0]);
+        map.put(BROKER_NAME, split[1]);
+
+        return map;
+    }
+
+    public static  long getOffset(RecordOffset recordOffset){
+        return Long.valueOf(
+                (String) recordOffset.getOffset().get(QUEUE_OFFSET)
+        );
+    }
+
+    public static  int getPartition(RecordPartition recordPartition){
+        return Integer.valueOf(
+                (String) recordPartition.getPartition().get(QUEUE_ID)
+        );
+    }
+
+    public static TopicPartition 
recordPartitionToTopicPartition(RecordPartition recordPartition){
+        String topicAndBrokerName = getTopicAndBrokerName(recordPartition);
+        int partition = getPartition(recordPartition);
+        return new TopicPartition(topicAndBrokerName, partition);
+    }
+
+    public static RecordPartition 
topicPartitionToRecordPartition(TopicPartition topicPartition){
+        Map<String, String> map = 
RecordUtil.getPartitionMap(topicPartition.topic());
+        map.put(RecordUtil.QUEUE_ID, topicPartition.partition() + "");
+        return new RecordPartition(map);
+    }
+
+
+    public static ConnectRecord toConnectRecord(SourceRecord sourceRecord, 
Converter keyConverter, Converter valueConverter,
+                                         HeaderConverter headerConverter){
+        RecordPartition recordPartition = new RecordPartition(new 
HashMap<>(sourceRecord.sourcePartition()));
+        RecordOffset recordOffset = new RecordOffset(new 
HashMap<>(sourceRecord.sourceOffset()));
+        Long timestamp = sourceRecord.timestamp();
+
+        byte[] value = valueConverter.fromConnectData(
+                sourceRecord.topic(), sourceRecord.valueSchema(), 
sourceRecord.value()
+        );
+
+        ConnectRecord connectRecord = new ConnectRecord(
+                recordPartition, recordOffset, timestamp,
+                SchemaBuilder.string().build(), new String(value, 
StandardCharsets.UTF_8)
+        );
+
+        if(sourceRecord.key() != null) {
+            byte[] key = keyConverter.fromConnectData
+                    (sourceRecord.topic(), sourceRecord.keySchema(), 
sourceRecord.key()
+                    );
+            connectRecord.addExtension(RecordUtil.KAFKA_MSG_KEY, new 
String(key, StandardCharsets.UTF_8));
+        }
+
+        for(Header header: sourceRecord.headers()){
+            byte[] headerValue = headerConverter.fromConnectHeader(
+                    sourceRecord.topic(), header.key(), header.schema(), 
header.value()
+            );
+            
connectRecord.addExtension(RecordUtil.KAFKA_CONNECT_RECORD_HEADER_KEY_PREFIX+header.key(),
 new String(headerValue, StandardCharsets.UTF_8));
+        }
+
+        if(sourceRecord.topic() != null){
+            
connectRecord.addExtension(RecordUtil.KAFKA_CONNECT_RECORD_TOPIC_KEY, 
sourceRecord.topic());
+        }
+        if(sourceRecord.kafkaPartition() != null){
+            
connectRecord.addExtension(RecordUtil.KAFKA_CONNECT_RECORD_PARTITION_KEY, 
sourceRecord.kafkaPartition().toString());
+        }
+        return connectRecord;
+    }
+
+
+    public static SourceRecord toSourceRecord(ConnectRecord connectRecord, 
Converter keyConverter, Converter valueConverter,
+                                                HeaderConverter 
headerConverter){
+        Map<String, ?> sourcePartition = new 
HashMap<>(connectRecord.getPosition().getPartition().getPartition());
+        Map<String, ?> sourceOffset = new 
HashMap<>(connectRecord.getPosition().getOffset().getOffset());
+        String topic = 
connectRecord.getExtension(RecordUtil.KAFKA_CONNECT_RECORD_TOPIC_KEY);
+        String partitionStr = 
connectRecord.getExtension(RecordUtil.KAFKA_CONNECT_RECORD_PARTITION_KEY);
+        Integer partition = null;
+        if(partitionStr != null){
+            partition = Integer.valueOf(partitionStr);
+        }
+        String keyStr = connectRecord.getExtension(RecordUtil.KAFKA_MSG_KEY);
+        Schema keySchema = null;
+        Object key = null;
+        if(keyStr != null){
+            SchemaAndValue keySchemaAndValue = 
keyConverter.toConnectData(topic, keyStr.getBytes(StandardCharsets.UTF_8));
+            keySchema = keySchemaAndValue.schema();
+            key = keySchemaAndValue.value();
+        }
+
+        ConnectHeaders headers = new ConnectHeaders();
+        for(String extKey: connectRecord.getExtensions().keySet()){
+            
if(!extKey.startsWith(RecordUtil.KAFKA_CONNECT_RECORD_HEADER_KEY_PREFIX)){
+                continue;
+            }
+            String header = 
extKey.substring(RecordUtil.KAFKA_CONNECT_RECORD_HEADER_KEY_PREFIX.length());
+            SchemaAndValue headerSchemaAndValue =  headerConverter
+                    .toConnectHeader(topic, header, 
connectRecord.getExtension(extKey).getBytes(StandardCharsets.UTF_8));
+
+            headers.add(header, headerSchemaAndValue);
+        }
+
+        SchemaAndValue valueSchemaAndValue = keyConverter.toConnectData(topic, 
((String)connectRecord.getData()).getBytes(StandardCharsets.UTF_8));
+        SourceRecord sourceRecord = new SourceRecord(
+                sourcePartition, sourceOffset, topic, partition,
+                keySchema, key, valueSchemaAndValue.schema(), 
valueSchemaAndValue.value(),
+                connectRecord.getTimestamp(),headers
+                );
+        return sourceRecord;
+    }
+
+}

Reply via email to