This is an automated email from the ASF dual-hosted git repository. sunxiaojian pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push: new 5ddf386 rocketmq-connect-kafka-connector-adapter 0.0.1-SNAPSHOT (#247) 5ddf386 is described below commit 5ddf3862f55c720114b51a410c911faf14c8dcb6 Author: 欧夺标 <ouduob...@gmail.com> AuthorDate: Fri Aug 19 14:49:46 2022 +0800 rocketmq-connect-kafka-connector-adapter 0.0.1-SNAPSHOT (#247) Co-authored-by: ouduobiao <duobiao....@alibaba-inc.com> --- .../README.md | 91 ++++++++++ .../pom.xml | 192 ++++++++++++++++++++ .../connect/kafka/config/ConfigDefine.java | 28 +++ .../kafka/connector/KafkaRocketmqConnector.java | 128 +++++++++++++ .../connector/KafkaRocketmqSinkConnector.java | 43 +++++ .../kafka/connector/KafkaRocketmqSinkTask.java | 199 +++++++++++++++++++++ .../connector/KafkaRocketmqSourceConnector.java | 43 +++++ .../kafka/connector/KafkaRocketmqSourceTask.java | 185 +++++++++++++++++++ .../connector/RocketmqKafkaConnectorContext.java | 21 +++ .../connector/RocketmqKafkaSinkTaskContext.java | 128 +++++++++++++ .../connector/RocketmqKafkaSourceTaskContext.java | 64 +++++++ .../rocketmq/connect/kafka/util/ConfigUtil.java | 34 ++++ .../connect/kafka/util/KafkaPluginsUtil.java | 25 +++ .../rocketmq/connect/kafka/util/RecordUtil.java | 157 ++++++++++++++++ 14 files changed, 1338 insertions(+) diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/README.md b/connectors/rocketmq-connect-kafka-connector-adapter/README.md new file mode 100644 index 0000000..01b7a63 --- /dev/null +++ b/connectors/rocketmq-connect-kafka-connector-adapter/README.md @@ -0,0 +1,91 @@ +**rocketmq-connect-kafka-connector-adapter** + +本项目的目标是让kafka connector运行在rocketmq-connect,使得数据在rocketmq导入导出。 + +**参数说明** + +参数分为3类:rocketmq connect runtime参数、 kafka-connector-adapter参数,以及 具体kafka connector参数 + +rocketmq connect runtime参数: +- **connector-class**: kafka-connector-adapter的类名 + + 如果是SourceConnector,对应为org.apache.rocketmq.connect.kafka.connector.KafkaRocketmqSourceConnector。 + + 如果是SinkConnector,对应为org.apache.rocketmq.connect.kafka.connector.KafkaRocketmqSinkConnector。 + +- **connect-topicname**: 要导入导出数据的rocketmq topic +- **tasks.num**: 启动的task数目 + +kafka-connector-adapter参数: +- **connector.class**: kafka connector的类名 +- **plugin.path**: kafka connector插件路径 + +具体kafka connector参数: + +参考具体kafka connector的文档 + + +# 快速开始 + +demo展示如何启动kafka-file-connector + +适配的kafka-file-connector的主要作用是从源文件中读取数据发送到RocketMQ集群 然后从Topic中读取消息,写入到目标文件 + +## 1.获取kafka-file-connector + +1. 下载kafka的二进制包:https://kafka.apache.org/downloads +2. 解压后到libs目录找到kafka-file-connector的jar包:connect-file-{version}.jar +3. 将jar拷贝到专门目录,这个目录作为kafka connector插件路径:plugin.path,比如:/tmp/kafka-plugins + + +## 2.构建rocketmq-connect-kafka-connector-adapter + +``` +git clone https://github.com/apache/rocketmq-connect.git + +cd connectors/rocketmq-connect-kafka-connector-adapter/ + +mvn package + +``` +最后将/target/rocketmq-connect-kafka-connector-adapter-0.0.1-SNAPSHOT-jar-with-dependencies.jar拷贝到rocketmq插件目录下,并修改connect-standalone.conf的pluginPaths为对应的rocketmq插件目录 +,比如/tmp/rocketmq-plugins + +## 3.运行Worker + +``` +cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT + +sh bin/connect-standalone.sh -c conf/connect-standalone.conf & + +``` + +## 4.启动source connector + +``` +touch /tmp/test-source-file.txt + +echo "Hello \r\nRocketMQ\r\n Connect" >> /tmp/test-source-file.txt + +curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSourceConnector -d '{"connector-class":"org.apache.rocketmq.connect.kafka.connector.KafkaRocketmqSourceConnector","connect-topicname":"fileTopic","connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","plugin.path":"/tmp/kafka-plugins","topic":"fileTopic","file":"/tmp/test-source-file.txt"}' +``` + +## 5.启动sink connector + +``` +curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSinkConnector -d '{"connector-class":"org.apache.rocketmq.connect.kafka.connector.KafkaRocketmqSinkConnector","connect-topicname":"fileTopic","connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector","plugin.path":"/tmp/kafka-plugins","file":"/tmp/test-sink-file.txt"}' + +cat /tmp/test-sink-file.txt +``` + +# kafka connect transform + +todo + +# 如何运行kafka-mongo-connector + +todo + +# 如何运行kafka-jdbc-connector + +todo \ No newline at end of file diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/pom.xml b/connectors/rocketmq-connect-kafka-connector-adapter/pom.xml new file mode 100644 index 0000000..5f62691 --- /dev/null +++ b/connectors/rocketmq-connect-kafka-connector-adapter/pom.xml @@ -0,0 +1,192 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-connect-kafka-connector-adapter</artifactId> + <version>0.0.1-SNAPSHOT</version> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> + + <!-- Compiler settings properties --> + <maven.compiler.source>1.8</maven.compiler.source> + <maven.compiler.target>1.8</maven.compiler.target> + </properties> + + <build> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>versions-maven-plugin</artifactId> + <version>2.3</version> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>clirr-maven-plugin</artifactId> + <version>2.7</version> + </plugin> + <plugin> + <artifactId>maven-dependency-plugin</artifactId> + <configuration> + <outputDirectory>${project.build.directory}/lib</outputDirectory> + <excludeTransitive>false</excludeTransitive> + <stripVersion>true</stripVersion> + </configuration> + </plugin> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.6.1</version> + <configuration> + <source>${maven.compiler.source}</source> + <target>${maven.compiler.target}</target> + <compilerVersion>${maven.compiler.source}</compilerVersion> + <showDeprecation>true</showDeprecation> + <showWarnings>true</showWarnings> + </configuration> + </plugin> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.19.1</version> + <configuration> + <argLine>-Xms512m -Xmx1024m</argLine> + <forkMode>always</forkMode> + <includes> + <include>**/*Test.java</include> + </includes> + </configuration> + </plugin> + <plugin> + <artifactId>maven-site-plugin</artifactId> + <version>3.6</version> + <configuration> + <locales>en_US</locales> + <outputEncoding>UTF-8</outputEncoding> + <inputEncoding>UTF-8</inputEncoding> + </configuration> + </plugin> + <plugin> + <artifactId>maven-source-plugin</artifactId> + <version>3.0.1</version> + <executions> + <execution> + <id>attach-sources</id> + <goals> + <goal>jar</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <version>0.12</version> + <configuration> + <excludes> + <exclude>README.md</exclude> + </excludes> + </configuration> + </plugin> + <plugin> + <artifactId>maven-javadoc-plugin</artifactId> + <version>2.10.4</version> + <configuration> + <charset>UTF-8</charset> + <locale>en_US</locale> + <excludePackageNames>io.openmessaging.internal</excludePackageNames> + </configuration> + <executions> + <execution> + <id>aggregate</id> + <goals> + <goal>aggregate</goal> + </goals> + <phase>site</phase> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-resources-plugin</artifactId> + <version>3.0.2</version> + <configuration> + <encoding>${project.build.sourceEncoding}</encoding> + </configuration> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>findbugs-maven-plugin</artifactId> + <version>3.0.4</version> + </plugin> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <version>3.0.0</version> + <configuration> + <archive> + <!-- The Main Class Here doesn't make a lot sense since it was dynamically loaded--> + <manifest> + <mainClass>org.apache.rocketmq.connect.hudi.connector.HudiSinkConnector</mainClass> + </manifest> + </archive> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + </configuration> + <executions> + <execution> + <id>make-assembly</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-checkstyle-plugin</artifactId> + <version>2.17</version> + <executions> + <execution> + <id>verify</id> + <phase>verify</phase> + <configuration> + <configLocation>style/rmq_checkstyle.xml</configLocation> + <encoding>UTF-8</encoding> + <consoleOutput>true</consoleOutput> + <failsOnError>true</failsOnError> + <includeTestSourceDirectory>false</includeTestSourceDirectory> + <includeTestResources>false</includeTestResources> + </configuration> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>connect-runtime</artifactId> + <version>3.2.0</version> + </dependency> + + <dependency> + <groupId>io.openmessaging</groupId> + <artifactId>openmessaging-connector</artifactId> + <version>0.1.4</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>1.7.7</version> + </dependency> + + </dependencies> +</project> diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigDefine.java b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigDefine.java new file mode 100644 index 0000000..9cbba08 --- /dev/null +++ b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigDefine.java @@ -0,0 +1,28 @@ +package org.apache.rocketmq.connect.kafka.config; + +import org.apache.kafka.connect.runtime.ConnectorConfig; +import org.apache.kafka.connect.runtime.TaskConfig; +import org.apache.kafka.connect.runtime.WorkerConfig; + +import java.util.HashSet; +import java.util.Set; + +public class ConfigDefine { + public static String ROCKETMQ_CONNECTOR_CLASS = "connector-class"; + public static String CONNECTOR_CLASS = ConnectorConfig.CONNECTOR_CLASS_CONFIG; + public static String PLUGIN_PATH = "plugin.path"; + + public static final String TASK_CLASS = TaskConfig.TASK_CLASS_CONFIG; + + public static final String KEY_CONVERTER = WorkerConfig.KEY_CONVERTER_CLASS_CONFIG; + public static final String VALUE_CONVERTER = WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG; + public static final String HEADER_CONVERTER = WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG; + + + public static final Set<String> REQUEST_CONFIG = new HashSet<String>(){ + { + add(CONNECTOR_CLASS); + add(PLUGIN_PATH); + } + }; +} diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqConnector.java b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqConnector.java new file mode 100644 index 0000000..89bee2c --- /dev/null +++ b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqConnector.java @@ -0,0 +1,128 @@ +package org.apache.rocketmq.connect.kafka.connector; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.component.connector.Connector; +import io.openmessaging.connector.api.component.task.Task; +import io.openmessaging.connector.api.component.task.source.SourceConnector; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.runtime.ConnectorConfig; +import org.apache.kafka.connect.runtime.isolation.Plugins; +import org.apache.rocketmq.connect.kafka.config.ConfigDefine; +import org.apache.rocketmq.connect.kafka.util.ConfigUtil; +import org.apache.rocketmq.connect.kafka.util.KafkaPluginsUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.DriverManager; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class KafkaRocketmqConnector extends Connector { + private static final Logger log = LoggerFactory.getLogger(KafkaRocketmqConnector.class); + + private Connector childConnector; + + private org.apache.kafka.connect.connector.Connector kafkaConnector; + private Plugins kafkaPlugins; + private Map<String, String> kafkaConnectorConfigs; + + public KafkaRocketmqConnector(Connector childConnector) { + this.childConnector = childConnector; + } + + @Override + public List<KeyValue> taskConfigs(int maxTasks) { + List<KeyValue> taskKeyValueConfigs = new ArrayList<>(); + runWithWithConnectorLoader(() ->{ + List<Map<String, String>> taskConfigs = this.kafkaConnector.taskConfigs(maxTasks); + taskKeyValueConfigs.addAll( + taskConfigs + .stream() + .map(ConfigUtil::mapConfigToKeyValue) + .collect(Collectors.toList()) + ); + + taskKeyValueConfigs.forEach(kv -> { + kv.put(ConfigDefine.PLUGIN_PATH, this.kafkaConnectorConfigs.get(ConfigDefine.PLUGIN_PATH)); + kv.put(ConfigDefine.CONNECTOR_CLASS, this.kafkaConnectorConfigs.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG)); + kv.put(ConfigDefine.TASK_CLASS, this.kafkaConnector.taskClass().getName()); + + kv.put(ConfigDefine.ROCKETMQ_CONNECTOR_CLASS, childConnector.getClass().getName()); + + if( this.kafkaConnectorConfigs.containsKey(ConfigDefine.KEY_CONVERTER)){ + kv.put(ConfigDefine.KEY_CONVERTER, this.kafkaConnectorConfigs.get(ConfigDefine.KEY_CONVERTER)); + } + + if( this.kafkaConnectorConfigs.containsKey(ConfigDefine.VALUE_CONVERTER)){ + kv.put(ConfigDefine.VALUE_CONVERTER, this.kafkaConnectorConfigs.get(ConfigDefine.VALUE_CONVERTER)); + } + + if( this.kafkaConnectorConfigs.containsKey(ConfigDefine.HEADER_CONVERTER)){ + kv.put(ConfigDefine.HEADER_CONVERTER, this.kafkaConnectorConfigs.get(ConfigDefine.HEADER_CONVERTER)); + } + }); + + }); + return taskKeyValueConfigs; + } + + @Override + public Class<? extends Task> taskClass() { + return this.childConnector instanceof SourceConnector + ? KafkaRocketmqSourceTask.class : KafkaRocketmqSinkTask.class; + } + + @Override + public void start(KeyValue config) { + runWithWithConnectorLoader(() ->{ + this.kafkaConnector.start(this.kafkaConnectorConfigs); + }); + } + + @Override + public void stop() { + runWithWithConnectorLoader(() ->{ + this.kafkaConnector.stop(); + }); + } + + + @Override + public void validate(KeyValue config) { + + for(String requestConfig: ConfigDefine.REQUEST_CONFIG){ + if(!config.containsKey(requestConfig)){ + throw new ConnectException("miss config:"+requestConfig); + } + } + + this.kafkaConnectorConfigs = ConfigUtil.keyValueConfigToMap(config); + log.info("kafka connector config is {}", this.kafkaConnectorConfigs); + this.kafkaPlugins = KafkaPluginsUtil.getPlugins(Collections.singletonMap(KafkaPluginsUtil.PLUGIN_PATH, this.kafkaConnectorConfigs.get(ConfigDefine.PLUGIN_PATH))); + String connectorClassName = this.kafkaConnectorConfigs.get(ConfigDefine.CONNECTOR_CLASS); + ClassLoader connectorLoader = this.kafkaPlugins.delegatingLoader().connectorLoader(connectorClassName); + ClassLoader savedLoader = Plugins.compareAndSwapLoaders(connectorLoader); + try { + this.kafkaConnector = this.kafkaPlugins.newConnector(connectorClassName); + this.kafkaConnector.validate(this.kafkaConnectorConfigs); + this.kafkaConnector.initialize( + new RocketmqKafkaConnectorContext(getConnectorContext()) + ); + } finally { + Plugins.compareAndSwapLoaders(savedLoader); + } + + } + + private void runWithWithConnectorLoader(Runnable runnable){ + ClassLoader current = this.kafkaPlugins.compareAndSwapLoaders(this.kafkaConnector); + try { + runnable.run(); + } finally { + Plugins.compareAndSwapLoaders(current); + } + } +} diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSinkConnector.java b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSinkConnector.java new file mode 100644 index 0000000..9cec38e --- /dev/null +++ b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSinkConnector.java @@ -0,0 +1,43 @@ +package org.apache.rocketmq.connect.kafka.connector; + + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.component.task.Task; +import io.openmessaging.connector.api.component.task.sink.SinkConnector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class KafkaRocketmqSinkConnector extends SinkConnector { + + private static final Logger log = LoggerFactory.getLogger(KafkaRocketmqSinkConnector.class); + + private KafkaRocketmqConnector kafkaRocketmqConnector = new KafkaRocketmqConnector(this); + + @Override + public List<KeyValue> taskConfigs(int maxTasks) { + return kafkaRocketmqConnector.taskConfigs(maxTasks); + } + + @Override + public Class<? extends Task> taskClass() { + return kafkaRocketmqConnector.taskClass(); + } + + @Override + public void start(KeyValue config) { + kafkaRocketmqConnector.start(config); + } + + @Override + public void stop() { + kafkaRocketmqConnector.stop(); + } + + @Override + public void validate(KeyValue config) { + kafkaRocketmqConnector.validate(config); + } + +} diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSinkTask.java b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSinkTask.java new file mode 100644 index 0000000..c2b6cfa --- /dev/null +++ b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSinkTask.java @@ -0,0 +1,199 @@ +package org.apache.rocketmq.connect.kafka.connector; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.component.task.sink.SinkTask; +import io.openmessaging.connector.api.data.ConnectRecord; +import io.openmessaging.connector.api.data.RecordOffset; +import io.openmessaging.connector.api.data.RecordPartition; +import io.openmessaging.connector.api.errors.ConnectException; +import io.openmessaging.connector.api.errors.RetriableException; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.header.ConnectHeaders; +import org.apache.kafka.connect.runtime.TaskConfig; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.runtime.isolation.Plugins; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.storage.HeaderConverter; + +import org.apache.kafka.connect.transforms.util.SimpleConfig; +import org.apache.rocketmq.connect.kafka.config.ConfigDefine; +import org.apache.rocketmq.connect.kafka.util.ConfigUtil; +import org.apache.rocketmq.connect.kafka.util.KafkaPluginsUtil; +import org.apache.rocketmq.connect.kafka.util.RecordUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; +import java.util.*; + + +public class KafkaRocketmqSinkTask extends SinkTask { + private static final Logger log = LoggerFactory.getLogger(KafkaRocketmqSinkTask.class); + + private org.apache.kafka.connect.sink.SinkTask kafkaSinkTask; + private ClassLoader classLoader; + + private Converter keyConverter; + private Converter valueConverter; + private HeaderConverter headerConverter; + + @Override + public void put(List<ConnectRecord> sinkRecords) throws ConnectException { + Collection<SinkRecord> records = new ArrayList<>(sinkRecords.size()); + for(ConnectRecord sinkRecord: sinkRecords){ + String topic = (String)sinkRecord.getPosition().getPartition().getPartition().get(RecordUtil.TOPIC); + SchemaAndValue valueSchemaAndValue = valueConverter.toConnectData(topic, ((String)sinkRecord.getData()).getBytes(StandardCharsets.UTF_8)); + String key = sinkRecord.getExtension(RecordUtil.KAFKA_MSG_KEY); + SchemaAndValue keySchemaAndValue = null; + if(key != null) { + keySchemaAndValue = keyConverter.toConnectData(topic, key.getBytes(StandardCharsets.UTF_8)); + } + + SinkRecord record = new SinkRecord( + RecordUtil.getTopicAndBrokerName(sinkRecord.getPosition().getPartition()), + RecordUtil.getPartition(sinkRecord.getPosition().getPartition()), + keySchemaAndValue==null?null:keySchemaAndValue.schema(), + keySchemaAndValue==null?null:keySchemaAndValue.value(), + valueSchemaAndValue.schema(), valueSchemaAndValue.value(), + RecordUtil.getOffset(sinkRecord.getPosition().getOffset()), + sinkRecord.getTimestamp(), TimestampType.NO_TIMESTAMP_TYPE, + getHeaders(sinkRecord.getExtensions(), topic) + ); + records.add(record); + } + try { + this.kafkaSinkTask.put(records); + } catch (org.apache.kafka.connect.errors.RetriableException e){ + throw new RetriableException(e); + } + } + + private ConnectHeaders getHeaders(KeyValue extensions, String topic){ + ConnectHeaders headers = new ConnectHeaders(); + for(String headerKey: extensions.keySet()){ + if(RecordUtil.KAFKA_MSG_KEY.equals(headerKey)){ + continue; + } + SchemaAndValue headerSchemaAndValue = headerConverter + .toConnectHeader(topic, headerKey, extensions.getString(headerKey).getBytes()); + headers.add(headerKey, headerSchemaAndValue); + } + return headers; + } + + + + @Override + public void start(KeyValue config) { + Map<String, String> kafkaTaskProps = ConfigUtil.keyValueConfigToMap(config); + log.info("kafka connector task config is {}", kafkaTaskProps); + Plugins kafkaPlugins = KafkaPluginsUtil.getPlugins(Collections.singletonMap(KafkaPluginsUtil.PLUGIN_PATH, kafkaTaskProps.get(ConfigDefine.PLUGIN_PATH))); + String connectorClass = kafkaTaskProps.get(ConfigDefine.CONNECTOR_CLASS); + ClassLoader connectorLoader = kafkaPlugins.delegatingLoader().connectorLoader(connectorClass); + this.classLoader = Plugins.compareAndSwapLoaders(connectorLoader); + try { + TaskConfig taskConfig = new TaskConfig(kafkaTaskProps); + Class<? extends Task> taskClass = taskConfig.getClass(ConfigDefine.TASK_CLASS).asSubclass(Task.class); + this.kafkaSinkTask = (org.apache.kafka.connect.sink.SinkTask)kafkaPlugins.newTask(taskClass); + initConverter(kafkaPlugins, kafkaTaskProps); + this.kafkaSinkTask.initialize(new RocketmqKafkaSinkTaskContext(sinkTaskContext)); + this.kafkaSinkTask.start(kafkaTaskProps); + } catch (Throwable e){ + recoverClassLoader(); + throw e; + } + } + + @Override + public void stop() { + try { + this.kafkaSinkTask.stop(); + } finally { + recoverClassLoader(); + } + } + + + private void recoverClassLoader(){ + if(this.classLoader != null){ + Plugins.compareAndSwapLoaders(this.classLoader); + this.classLoader = null; + } + } + + private void initConverter(Plugins plugins, Map<String, String> taskProps){ + + ConfigDef converterConfigDef = new ConfigDef() + .define(ConfigDefine.KEY_CONVERTER, ConfigDef.Type.CLASS, null, ConfigDef.Importance.LOW, "") + .define(ConfigDefine.VALUE_CONVERTER, ConfigDef.Type.CLASS, null, ConfigDef.Importance.LOW, "") + .define(ConfigDefine.HEADER_CONVERTER, ConfigDef.Type.CLASS, null, ConfigDef.Importance.LOW, ""); + + Map<String, String> connProps = new HashMap<>(); + if(taskProps.containsKey(ConfigDefine.KEY_CONVERTER)){ + connProps.put(ConfigDefine.KEY_CONVERTER, taskProps.get(ConfigDefine.KEY_CONVERTER)); + } + if(taskProps.containsKey(ConfigDefine.VALUE_CONVERTER)){ + connProps.put(ConfigDefine.VALUE_CONVERTER, taskProps.get(ConfigDefine.VALUE_CONVERTER)); + } + if(taskProps.containsKey(ConfigDefine.HEADER_CONVERTER)){ + connProps.put(ConfigDefine.HEADER_CONVERTER, taskProps.get(ConfigDefine.HEADER_CONVERTER)); + } + SimpleConfig connConfig = new SimpleConfig(converterConfigDef, connProps); + + Map<String, String> workerProps = new HashMap<>(); + workerProps.put(ConfigDefine.KEY_CONVERTER, "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put(ConfigDefine.VALUE_CONVERTER, "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put(ConfigDefine.HEADER_CONVERTER, "org.apache.kafka.connect.storage.SimpleHeaderConverter"); + SimpleConfig workerConfig = new SimpleConfig(converterConfigDef, workerProps); + + keyConverter = plugins.newConverter(connConfig, ConfigDefine.KEY_CONVERTER, Plugins.ClassLoaderUsage + .CURRENT_CLASSLOADER); + valueConverter = plugins.newConverter(connConfig, ConfigDefine.VALUE_CONVERTER, Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER); + headerConverter = plugins.newHeaderConverter(connConfig, ConfigDefine.HEADER_CONVERTER, + Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER); + + if (keyConverter == null) { + keyConverter = plugins.newConverter(workerConfig, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Plugins.ClassLoaderUsage.PLUGINS); + log.info("Set up the key converter {} for task {} using the worker config", keyConverter.getClass(), sinkTaskContext.getTaskName()); + } else { + log.info("Set up the key converter {} for task {} using the connector config", keyConverter.getClass(), sinkTaskContext.getTaskName()); + } + if (valueConverter == null) { + valueConverter = plugins.newConverter(workerConfig, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Plugins.ClassLoaderUsage.PLUGINS); + log.info("Set up the value converter {} for task {} using the worker config", valueConverter.getClass(), sinkTaskContext.getTaskName()); + } else { + log.info("Set up the value converter {} for task {} using the connector config", valueConverter.getClass(), sinkTaskContext.getTaskName()); + } + if (headerConverter == null) { + headerConverter = plugins.newHeaderConverter(workerConfig, WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, Plugins.ClassLoaderUsage + .PLUGINS); + log.info("Set up the header converter {} for task {} using the worker config", headerConverter.getClass(), sinkTaskContext.getTaskName()); + } else { + log.info("Set up the header converter {} for task {} using the connector config", headerConverter.getClass(), sinkTaskContext.getTaskName()); + } + } + + @Override + public void flush(Map<RecordPartition, RecordOffset> currentOffsets) throws ConnectException { + + if(this.kafkaSinkTask == null){ + log.warn("the task is not start, currentOffsets:{}", currentOffsets); + return; + } + + Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(currentOffsets.size()); + + for(Map.Entry<RecordPartition, RecordOffset> po: currentOffsets.entrySet()){ + TopicPartition tp = RecordUtil.recordPartitionToTopicPartition(po.getKey()); + OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(RecordUtil.getOffset(po.getValue())); + offsets.put(tp, offsetAndMetadata); + } + this.kafkaSinkTask.flush(offsets); + } +} diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSourceConnector.java b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSourceConnector.java new file mode 100644 index 0000000..6f6e006 --- /dev/null +++ b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSourceConnector.java @@ -0,0 +1,43 @@ +package org.apache.rocketmq.connect.kafka.connector; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.component.task.Task; +import io.openmessaging.connector.api.component.task.source.SourceConnector; + +import java.util.List; + + +public class KafkaRocketmqSourceConnector extends SourceConnector { + + private static final Logger log = LoggerFactory.getLogger(KafkaRocketmqSourceConnector.class); + + private KafkaRocketmqConnector kafkaRocketmqConnector = new KafkaRocketmqConnector(this); + + @Override + public List<KeyValue> taskConfigs(int maxTasks) { + return kafkaRocketmqConnector.taskConfigs(maxTasks); + } + + @Override + public Class<? extends Task> taskClass() { + return kafkaRocketmqConnector.taskClass(); + } + + @Override + public void start(KeyValue config) { + kafkaRocketmqConnector.start(config); + } + + @Override + public void stop() { + kafkaRocketmqConnector.stop(); + } + + @Override + public void validate(KeyValue config) { + kafkaRocketmqConnector.validate(config); + } + +} diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSourceTask.java b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSourceTask.java new file mode 100644 index 0000000..3db8251 --- /dev/null +++ b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSourceTask.java @@ -0,0 +1,185 @@ +package org.apache.rocketmq.connect.kafka.connector; + +import io.openmessaging.connector.api.data.*; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.runtime.TaskConfig; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.storage.HeaderConverter; +import org.apache.kafka.connect.transforms.util.SimpleConfig; +import org.apache.rocketmq.connect.kafka.util.KafkaPluginsUtil; +import org.apache.rocketmq.connect.kafka.util.RecordUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.component.task.source.SourceTask; +import org.apache.kafka.connect.runtime.isolation.Plugins; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.rocketmq.connect.kafka.config.ConfigDefine; +import org.apache.rocketmq.connect.kafka.util.ConfigUtil; + +import java.util.*; + +public class KafkaRocketmqSourceTask extends SourceTask { + private static final Logger log = LoggerFactory.getLogger(KafkaRocketmqSourceTask.class); + + private org.apache.kafka.connect.source.SourceTask kafkaSourceTask; + + private ClassLoader classLoader; + + private Converter keyConverter; + private Converter valueConverter; + private HeaderConverter headerConverter; + + + @Override + public List<ConnectRecord> poll() throws InterruptedException { + + List<SourceRecord> sourceRecords = this.kafkaSourceTask.poll(); + + if(sourceRecords == null){ + return null; + } + + List<ConnectRecord> connectRecords = new ArrayList<>(sourceRecords.size()); + for(SourceRecord sourceRecord: sourceRecords){ + connectRecords.add(RecordUtil.toConnectRecord(sourceRecord, + this.keyConverter, this.valueConverter, this.headerConverter)); + } + return connectRecords; + } + + @Override + public void start(KeyValue config) { + Map<String, String> kafkaTaskProps = ConfigUtil.keyValueConfigToMap(config); + log.info("kafka connector task config is {}", kafkaTaskProps); + Plugins kafkaPlugins = KafkaPluginsUtil.getPlugins(Collections.singletonMap(KafkaPluginsUtil.PLUGIN_PATH, kafkaTaskProps.get(ConfigDefine.PLUGIN_PATH))); + String connectorClass = kafkaTaskProps.get(ConfigDefine.CONNECTOR_CLASS); + ClassLoader connectorLoader = kafkaPlugins.delegatingLoader().connectorLoader(connectorClass); + this.classLoader = Plugins.compareAndSwapLoaders(connectorLoader); + try { + + TaskConfig taskConfig = new TaskConfig(kafkaTaskProps); + Class<? extends Task> taskClass = taskConfig.getClass(ConfigDefine.TASK_CLASS).asSubclass(Task.class); + this.kafkaSourceTask = (org.apache.kafka.connect.source.SourceTask)kafkaPlugins.newTask(taskClass); + + initConverter(kafkaPlugins, kafkaTaskProps); + + this.kafkaSourceTask.initialize(new RocketmqKafkaSourceTaskContext(sourceTaskContext)); + this.kafkaSourceTask.start(kafkaTaskProps); + } catch (Throwable e){ + recoverClassLoader(); + throw e; + } + } + + private void initConverter(Plugins plugins, Map<String, String> taskProps){ + + ConfigDef converterConfigDef = new ConfigDef() + .define(ConfigDefine.KEY_CONVERTER, ConfigDef.Type.CLASS, null, ConfigDef.Importance.LOW, "") + .define(ConfigDefine.VALUE_CONVERTER, ConfigDef.Type.CLASS, null, ConfigDef.Importance.LOW, "") + .define(ConfigDefine.HEADER_CONVERTER, ConfigDef.Type.CLASS, null, ConfigDef.Importance.LOW, ""); + + Map<String, String> connProps = new HashMap<>(); + if(taskProps.containsKey(ConfigDefine.KEY_CONVERTER)){ + connProps.put(ConfigDefine.KEY_CONVERTER, taskProps.get(ConfigDefine.KEY_CONVERTER)); + } + if(taskProps.containsKey(ConfigDefine.VALUE_CONVERTER)){ + connProps.put(ConfigDefine.VALUE_CONVERTER, taskProps.get(ConfigDefine.VALUE_CONVERTER)); + } + if(taskProps.containsKey(ConfigDefine.HEADER_CONVERTER)){ + connProps.put(ConfigDefine.HEADER_CONVERTER, taskProps.get(ConfigDefine.HEADER_CONVERTER)); + } + SimpleConfig connConfig = new SimpleConfig(converterConfigDef, connProps); + + Map<String, String> workerProps = new HashMap<>(); + workerProps.put(ConfigDefine.KEY_CONVERTER, "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put(ConfigDefine.VALUE_CONVERTER, "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put(ConfigDefine.HEADER_CONVERTER, "org.apache.kafka.connect.storage.SimpleHeaderConverter"); + SimpleConfig workerConfig = new SimpleConfig(converterConfigDef, workerProps); + + keyConverter = plugins.newConverter(connConfig, ConfigDefine.KEY_CONVERTER, Plugins.ClassLoaderUsage + .CURRENT_CLASSLOADER); + valueConverter = plugins.newConverter(connConfig, ConfigDefine.VALUE_CONVERTER, Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER); + headerConverter = plugins.newHeaderConverter(connConfig, ConfigDefine.HEADER_CONVERTER, + Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER); + + if (keyConverter == null) { + keyConverter = plugins.newConverter(workerConfig, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Plugins.ClassLoaderUsage.PLUGINS); + log.info("Set up the key converter {} for task {} using the worker config", keyConverter.getClass(), sourceTaskContext.getTaskName()); + } else { + log.info("Set up the key converter {} for task {} using the connector config", keyConverter.getClass(), sourceTaskContext.getTaskName()); + } + if (valueConverter == null) { + valueConverter = plugins.newConverter(workerConfig, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Plugins.ClassLoaderUsage.PLUGINS); + log.info("Set up the value converter {} for task {} using the worker config", valueConverter.getClass(), sourceTaskContext.getTaskName()); + } else { + log.info("Set up the value converter {} for task {} using the connector config", valueConverter.getClass(), sourceTaskContext.getTaskName()); + } + if (headerConverter == null) { + headerConverter = plugins.newHeaderConverter(workerConfig, WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, Plugins.ClassLoaderUsage + .PLUGINS); + log.info("Set up the header converter {} for task {} using the worker config", headerConverter.getClass(), sourceTaskContext.getTaskName()); + } else { + log.info("Set up the header converter {} for task {} using the connector config", headerConverter.getClass(), sourceTaskContext.getTaskName()); + } + } + + @Override + public void stop() { + try { + this.kafkaSourceTask.stop(); + } finally { + recoverClassLoader(); + } + } + + private void recoverClassLoader(){ + if(this.classLoader != null){ + Plugins.compareAndSwapLoaders(this.classLoader); + this.classLoader = null; + } + } + + + @Override + public void commit(ConnectRecord record, Map<String, String> metadata) { + + if(this.kafkaSourceTask == null){ + log.warn("the task is not start, metadata:{}", metadata); + return; + } + + try { + long baseOffset = Long.valueOf(metadata.get(RecordUtil.QUEUE_OFFSET)); + TopicPartition topicPartition = new TopicPartition(metadata.get(RecordUtil.TOPIC), Integer.valueOf(metadata.get(RecordUtil.QUEUE_ID))); + RecordMetadata recordMetadata = new RecordMetadata( + topicPartition, baseOffset, 0, + System.currentTimeMillis(), 0,0 + ); + this.kafkaSourceTask.commitRecord( + RecordUtil.toSourceRecord(record, this.keyConverter, this.valueConverter, this.headerConverter), + recordMetadata + ); + } catch (InterruptedException e){ + Thread.currentThread().interrupt(); + } + } + + @Override + public void commit() { + if(this.kafkaSourceTask == null){ + log.warn("the task is not start"); + return; + } + + try { + this.kafkaSourceTask.commit(); + } catch (InterruptedException e){ + Thread.currentThread().interrupt(); + } + } +} diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/RocketmqKafkaConnectorContext.java b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/RocketmqKafkaConnectorContext.java new file mode 100644 index 0000000..ea20b81 --- /dev/null +++ b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/RocketmqKafkaConnectorContext.java @@ -0,0 +1,21 @@ +package org.apache.rocketmq.connect.kafka.connector; + +import io.openmessaging.connector.api.component.connector.ConnectorContext; + +public class RocketmqKafkaConnectorContext implements org.apache.kafka.connect.connector.ConnectorContext{ + protected ConnectorContext rocketMqConnectorContext; + + public RocketmqKafkaConnectorContext(ConnectorContext rocketMqConnectorContext) { + this.rocketMqConnectorContext = rocketMqConnectorContext; + } + + @Override + public void requestTaskReconfiguration() { + this.rocketMqConnectorContext.requestTaskReconfiguration(); + } + + @Override + public void raiseError(Exception e) { + this.rocketMqConnectorContext.raiseError(e); + } +} diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/RocketmqKafkaSinkTaskContext.java b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/RocketmqKafkaSinkTaskContext.java new file mode 100644 index 0000000..848032b --- /dev/null +++ b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/RocketmqKafkaSinkTaskContext.java @@ -0,0 +1,128 @@ +package org.apache.rocketmq.connect.kafka.connector; + + +import io.openmessaging.connector.api.component.task.sink.SinkTaskContext; +import io.openmessaging.connector.api.data.*; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.sink.ErrantRecordReporter; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.rocketmq.connect.kafka.util.ConfigUtil; +import org.apache.rocketmq.connect.kafka.util.RecordUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.*; +import java.util.stream.Collectors; + + +public class RocketmqKafkaSinkTaskContext implements org.apache.kafka.connect.sink.SinkTaskContext { + + private static final Logger log = LoggerFactory.getLogger(RocketmqKafkaSinkTaskContext.class); + + private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(1); + + private SinkTaskContext sinkTaskContext; + + public RocketmqKafkaSinkTaskContext(SinkTaskContext sinkTaskContext) { + this.sinkTaskContext = sinkTaskContext; + } + + @Override + public Map<String, String> configs() { + return ConfigUtil.keyValueConfigToMap(sinkTaskContext.configs()); + } + + @Override + public void offset(Map<TopicPartition, Long> offsets) { + + Map<RecordPartition, RecordOffset> offsets2 = new HashMap<>(offsets.size()); + offsets.forEach((tp,offset) -> { + Map<String, String> map = RecordUtil.getPartitionMap(tp.topic()); + map.put(RecordUtil.QUEUE_ID, tp.partition() + ""); + RecordPartition recordPartition = new RecordPartition(map); + + Map<String, String> offsetMap = new HashMap<>(); + offsetMap.put(RecordUtil.QUEUE_OFFSET, offset + ""); + RecordOffset recordOffset = new RecordOffset(offsetMap); + + offsets2.put(recordPartition, recordOffset); + }); + sinkTaskContext.resetOffset(offsets2); + } + + @Override + public void offset(TopicPartition tp, long offset) { + this.offset(Collections.singletonMap(tp, offset)); + } + + @Override + public void timeout(long timeoutMs) { + log.info("ignore timeout because not impl, timeoutMs:{}", timeoutMs); + } + + @Override + public Set<TopicPartition> assignment() { + return sinkTaskContext.assignment() + .stream() + .map(RecordUtil::recordPartitionToTopicPartition) + .collect(Collectors.toSet()); + } + + @Override + public void pause(TopicPartition... partitions) { + sinkTaskContext.pause( + toRecordPartitions(partitions) + ); + } + + @Override + public void resume(TopicPartition... partitions) { + sinkTaskContext.resume( + toRecordPartitions(partitions) + ); + } + + private List<RecordPartition> toRecordPartitions(TopicPartition... partitions){ + return Arrays.stream(partitions) + .map(RecordUtil::topicPartitionToRecordPartition) + .collect(Collectors.toList()); + } + + @Override + public void requestCommit() { + log.info("ignore requestCommit because not impl"); + } + + + @Override + public ErrantRecordReporter errantRecordReporter() { + return new ErrantRecordReporter() { + @Override + public Future<Void> report(SinkRecord record, Throwable error) { + + return EXECUTOR_SERVICE.submit(new Callable<Void>() { + @Override + public Void call() throws Exception { + + Map<String, String> partitionMap = RecordUtil.getPartitionMap(record.topic()); + partitionMap.put(RecordUtil.QUEUE_ID, record.kafkaPartition() + ""); + RecordPartition recordPartition = new RecordPartition(partitionMap); + + Map<String, String> offsetMap = new HashMap<>(); + offsetMap.put(RecordUtil.QUEUE_OFFSET, record.kafkaOffset() + ""); + RecordOffset recordOffset = new RecordOffset(offsetMap); + + ConnectRecord connectRecord = new ConnectRecord( + recordPartition, recordOffset, record.timestamp(), + SchemaBuilder.string().build(), record.value() + ); + sinkTaskContext.errorRecordReporter().report(connectRecord, error); + return null; + } + }); + + } + }; + } +} diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/RocketmqKafkaSourceTaskContext.java b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/RocketmqKafkaSourceTaskContext.java new file mode 100644 index 0000000..c30294a --- /dev/null +++ b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/RocketmqKafkaSourceTaskContext.java @@ -0,0 +1,64 @@ +package org.apache.rocketmq.connect.kafka.connector; + + +import io.openmessaging.connector.api.component.task.source.SourceTaskContext; +import io.openmessaging.connector.api.data.RecordPartition; +import org.apache.kafka.connect.storage.OffsetStorageReader; +import org.apache.rocketmq.connect.kafka.util.ConfigUtil; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; + +public class RocketmqKafkaSourceTaskContext implements org.apache.kafka.connect.source.SourceTaskContext { + + private SourceTaskContext sourceTaskContext; + + public RocketmqKafkaSourceTaskContext(SourceTaskContext sourceTaskContext) { + this.sourceTaskContext = sourceTaskContext; + } + + @Override + public Map<String, String> configs() { + return ConfigUtil.keyValueConfigToMap(sourceTaskContext.configs()); + } + + @Override + public OffsetStorageReader offsetStorageReader() { + return new OffsetStorageReader(){ + @Override + public <T> Map<String, Object> offset(Map<String, T> partition) { + return offsets(Collections.singletonList(partition)).get(partition); + } + + @Override + public <T> Map<Map<String, T>, Map<String, Object>> offsets(Collection<Map<String, T>> partitions) { + + Collection<RecordPartition> rocketmqPartitions = partitions.stream() + .map(RecordPartition::new) + .collect(Collectors.toList()); + + Map<Map<String, T>, Map<String, Object>> results = new HashMap<>(partitions.size()); + sourceTaskContext + .offsetStorageReader() + .readOffsets(rocketmqPartitions) + .forEach((p,o) -> { + results.put((Map<String, T>)p.getPartition(), mayConvertToLongOffset(o.getOffset())); + }); + return results; + } + + // kafka的offset是long表示,被序列化再反序列化会是int + private Map<String, Object> mayConvertToLongOffset(Map<String, ?> offset){ + Map<String, Object> result = new HashMap<>(offset.size()); + for(Map.Entry<String, ?> kv: offset.entrySet()){ + Object v = kv.getValue(); + result.put(kv.getKey(), v instanceof Integer ? ((Integer) v).longValue():v); + } + return result; + } + }; + } +} diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/ConfigUtil.java b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/ConfigUtil.java new file mode 100644 index 0000000..2113fcb --- /dev/null +++ b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/ConfigUtil.java @@ -0,0 +1,34 @@ +package org.apache.rocketmq.connect.kafka.util; + +import io.openmessaging.KeyValue; +import io.openmessaging.internal.DefaultKeyValue; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class ConfigUtil { + + public static Map<String, String> keyValueConfigToMap(KeyValue keyValueConfig){ + if(keyValueConfig == null){ + return null; + } + + Set<String> configKeySet = keyValueConfig.keySet(); + Map<String, String> mapConfig = new HashMap<>(configKeySet.size()); + configKeySet.forEach(key -> mapConfig.put(key, keyValueConfig.getString(key))); + return mapConfig; + } + + + public static KeyValue mapConfigToKeyValue(Map<String, String> mapConfig){ + if(mapConfig == null){ + return null; + } + + KeyValue keyValue = new DefaultKeyValue(); + mapConfig.forEach((k, v)-> keyValue.put(k, v)); + + return keyValue; + } +} diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/KafkaPluginsUtil.java b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/KafkaPluginsUtil.java new file mode 100644 index 0000000..65a324d --- /dev/null +++ b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/KafkaPluginsUtil.java @@ -0,0 +1,25 @@ +package org.apache.rocketmq.connect.kafka.util; + +import org.apache.kafka.connect.runtime.isolation.Plugins; + +import java.util.HashMap; +import java.util.Map; + +public class KafkaPluginsUtil { + + + public static final String PLUGIN_PATH = "plugin.path"; + private static final Map<String, Plugins> CACHE = new HashMap<>(); + + public static Plugins getPlugins(Map<String, String> props){ + String path = props.get(PLUGIN_PATH); + synchronized (CACHE){ + Plugins plugins = CACHE.get(path); + if(plugins == null){ + plugins = new Plugins(props); + CACHE.put(path, plugins); + } + return plugins; + } + } +} diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/RecordUtil.java b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/RecordUtil.java new file mode 100644 index 0000000..815ae56 --- /dev/null +++ b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/RecordUtil.java @@ -0,0 +1,157 @@ +package org.apache.rocketmq.connect.kafka.util; + +import io.openmessaging.connector.api.data.ConnectRecord; +import io.openmessaging.connector.api.data.RecordOffset; +import io.openmessaging.connector.api.data.RecordPartition; +import io.openmessaging.connector.api.data.SchemaBuilder; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.header.ConnectHeaders; +import org.apache.kafka.connect.header.Header; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.storage.HeaderConverter; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + +public class RecordUtil { + + public static final String BROKER_NAME = "brokerName"; + public static final String QUEUE_ID = "queueId"; + public static final String TOPIC = "topic"; + public static final String QUEUE_OFFSET = "queueOffset"; + + + private static final String TOPIC_SEP = "@#@"; + + + public static final String KAFKA_MSG_KEY = "kafka_key"; + public static final String KAFKA_CONNECT_RECORD_TOPIC_KEY = "kafka_connect_record_topic"; + public static final String KAFKA_CONNECT_RECORD_PARTITION_KEY = "kafka_connect_record_partition"; + public static final String KAFKA_CONNECT_RECORD_HEADER_KEY_PREFIX = "kafka_connect_record_header_"; + + public static String getTopicAndBrokerName(RecordPartition recordPartition) { + return new StringBuilder() + .append(recordPartition.getPartition().get(TOPIC)) + .append(TOPIC_SEP) + .append(recordPartition.getPartition().get(BROKER_NAME)) + .toString(); + } + + public static Map<String, String> getPartitionMap(String topicAndBrokerName) { + String[] split = topicAndBrokerName.split(TOPIC_SEP); + Map<String, String> map = new HashMap<>(); + map.put(TOPIC, split[0]); + map.put(BROKER_NAME, split[1]); + + return map; + } + + public static long getOffset(RecordOffset recordOffset){ + return Long.valueOf( + (String) recordOffset.getOffset().get(QUEUE_OFFSET) + ); + } + + public static int getPartition(RecordPartition recordPartition){ + return Integer.valueOf( + (String) recordPartition.getPartition().get(QUEUE_ID) + ); + } + + public static TopicPartition recordPartitionToTopicPartition(RecordPartition recordPartition){ + String topicAndBrokerName = getTopicAndBrokerName(recordPartition); + int partition = getPartition(recordPartition); + return new TopicPartition(topicAndBrokerName, partition); + } + + public static RecordPartition topicPartitionToRecordPartition(TopicPartition topicPartition){ + Map<String, String> map = RecordUtil.getPartitionMap(topicPartition.topic()); + map.put(RecordUtil.QUEUE_ID, topicPartition.partition() + ""); + return new RecordPartition(map); + } + + + public static ConnectRecord toConnectRecord(SourceRecord sourceRecord, Converter keyConverter, Converter valueConverter, + HeaderConverter headerConverter){ + RecordPartition recordPartition = new RecordPartition(new HashMap<>(sourceRecord.sourcePartition())); + RecordOffset recordOffset = new RecordOffset(new HashMap<>(sourceRecord.sourceOffset())); + Long timestamp = sourceRecord.timestamp(); + + byte[] value = valueConverter.fromConnectData( + sourceRecord.topic(), sourceRecord.valueSchema(), sourceRecord.value() + ); + + ConnectRecord connectRecord = new ConnectRecord( + recordPartition, recordOffset, timestamp, + SchemaBuilder.string().build(), new String(value, StandardCharsets.UTF_8) + ); + + if(sourceRecord.key() != null) { + byte[] key = keyConverter.fromConnectData + (sourceRecord.topic(), sourceRecord.keySchema(), sourceRecord.key() + ); + connectRecord.addExtension(RecordUtil.KAFKA_MSG_KEY, new String(key, StandardCharsets.UTF_8)); + } + + for(Header header: sourceRecord.headers()){ + byte[] headerValue = headerConverter.fromConnectHeader( + sourceRecord.topic(), header.key(), header.schema(), header.value() + ); + connectRecord.addExtension(RecordUtil.KAFKA_CONNECT_RECORD_HEADER_KEY_PREFIX+header.key(), new String(headerValue, StandardCharsets.UTF_8)); + } + + if(sourceRecord.topic() != null){ + connectRecord.addExtension(RecordUtil.KAFKA_CONNECT_RECORD_TOPIC_KEY, sourceRecord.topic()); + } + if(sourceRecord.kafkaPartition() != null){ + connectRecord.addExtension(RecordUtil.KAFKA_CONNECT_RECORD_PARTITION_KEY, sourceRecord.kafkaPartition().toString()); + } + return connectRecord; + } + + + public static SourceRecord toSourceRecord(ConnectRecord connectRecord, Converter keyConverter, Converter valueConverter, + HeaderConverter headerConverter){ + Map<String, ?> sourcePartition = new HashMap<>(connectRecord.getPosition().getPartition().getPartition()); + Map<String, ?> sourceOffset = new HashMap<>(connectRecord.getPosition().getOffset().getOffset()); + String topic = connectRecord.getExtension(RecordUtil.KAFKA_CONNECT_RECORD_TOPIC_KEY); + String partitionStr = connectRecord.getExtension(RecordUtil.KAFKA_CONNECT_RECORD_PARTITION_KEY); + Integer partition = null; + if(partitionStr != null){ + partition = Integer.valueOf(partitionStr); + } + String keyStr = connectRecord.getExtension(RecordUtil.KAFKA_MSG_KEY); + Schema keySchema = null; + Object key = null; + if(keyStr != null){ + SchemaAndValue keySchemaAndValue = keyConverter.toConnectData(topic, keyStr.getBytes(StandardCharsets.UTF_8)); + keySchema = keySchemaAndValue.schema(); + key = keySchemaAndValue.value(); + } + + ConnectHeaders headers = new ConnectHeaders(); + for(String extKey: connectRecord.getExtensions().keySet()){ + if(!extKey.startsWith(RecordUtil.KAFKA_CONNECT_RECORD_HEADER_KEY_PREFIX)){ + continue; + } + String header = extKey.substring(RecordUtil.KAFKA_CONNECT_RECORD_HEADER_KEY_PREFIX.length()); + SchemaAndValue headerSchemaAndValue = headerConverter + .toConnectHeader(topic, header, connectRecord.getExtension(extKey).getBytes(StandardCharsets.UTF_8)); + + headers.add(header, headerSchemaAndValue); + } + + SchemaAndValue valueSchemaAndValue = keyConverter.toConnectData(topic, ((String)connectRecord.getData()).getBytes(StandardCharsets.UTF_8)); + SourceRecord sourceRecord = new SourceRecord( + sourcePartition, sourceOffset, topic, partition, + keySchema, key, valueSchemaAndValue.schema(), valueSchemaAndValue.value(), + connectRecord.getTimestamp(),headers + ); + return sourceRecord; + } + +}