This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 8dcf2b37 [ISSUE #523 & OSPP2023] Feature: support archetype to create 
connectors (#525)
8dcf2b37 is described below

commit 8dcf2b37f247e2261512df8e00df7f875d4190d0
Author: Ao Qiao <qiao...@foxmail.com>
AuthorDate: Tue Oct 17 18:05:20 2023 +0800

    [ISSUE #523 & OSPP2023] Feature: support archetype to create connectors 
(#525)
    
    * initial commit
    
    * commit
    
    * rename
    
    * finish source connector
    
    * finish sink connector
    
    * add license
    
    * Update README.md
    
    * Update connect-standalone.conf
    
    * support archetype
    
    * Delete README.md
---
 .../README.md                                      |  36 ++++
 .../rocketmq-connect-connectors-archetype/pom.xml  |  37 ++++
 .../META-INF/maven/archetype-metadata.xml          |  66 ++++++++
 .../main/resources/archetype-resources/README.md   |  55 ++++++
 .../src/main/resources/archetype-resources/pom.xml | 188 +++++++++++++++++++++
 .../java/config/__dbNameToCamel__BaseConfig.java   | 131 ++++++++++++++
 .../java/config/__dbNameToCamel__Constants.java    |  50 ++++++
 .../java/config/__dbNameToCamel__SinkConfig.java   |  34 ++++
 .../java/config/__dbNameToCamel__SourceConfig.java |  45 +++++
 .../java/helper/__dbNameToCamel__HelperClient.java |  72 ++++++++
 .../main/java/helper/__dbNameToCamel__Record.java  |  25 +++
 .../java/sink/__dbNameToCamel__SinkConnector.java  |  60 +++++++
 .../main/java/sink/__dbNameToCamel__SinkTask.java  |  69 ++++++++
 .../source/__dbNameToCamel__SourceConnector.java   |  60 +++++++
 .../java/source/__dbNameToCamel__SourceTask.java   | 167 ++++++++++++++++++
 .../java/sink/__dbNameToCamel__SinkTaskTest.java   |  86 ++++++++++
 .../source/__dbNameToCamel__SourceTaskTest.java    |  28 +++
 .../resources/projects/basic/archetype.properties  |  11 ++
 .../src/test/resources/projects/basic/goal.txt     |   0
 19 files changed, 1220 insertions(+)

diff --git a/connectors/rocketmq-connect-connectors-archetype/README.md 
b/connectors/rocketmq-connect-connectors-archetype/README.md
new file mode 100644
index 00000000..e9fee6f0
--- /dev/null
+++ b/connectors/rocketmq-connect-connectors-archetype/README.md
@@ -0,0 +1,36 @@
+## How to Use Connnector-Archetype
+
+1. 进入脚手架文件夹
+   
+   ```shell
+   cd rocketmq-connect-connectors-archetype/
+   ```
+
+2. 将脚手架安装到本地
+   
+   ```shell
+   mvn -e clean install
+   ```
+
+3. 创建connector模版工程
+   
+   ```shell
+   cd connectors/
+   mvn archetype:generate \
+    -DarchetypeGroupId=org.apache.rocketmq \
+    -DarchetypeArtifactId=rocketmq-connect-connectors-archetype \
+    -DarchetypeVersion=1.0-SNAPSHOT \
+    -DdatabaseName=<databasename>
+   ```
+   
+   例:创建Clickhouse-Connector
+   
+   ```shell
+   mvn archetype:generate \
+    -DarchetypeGroupId=org.apache.rocketmq \
+    -DarchetypeArtifactId=rocketmq-connect-connectors-archetype \
+    -DarchetypeVersion=1.0-SNAPSHOT \
+    -DdatabaseName=clickhouse
+   ```
+
+4. 
如上指令将创建一个connector的框架,开发者主要关心`helper/xxxHelperClient`以及`xxxxSourceTask`,`xxxSinkTask`的实现即可,剩余配置可以按需修改。
diff --git a/connectors/rocketmq-connect-connectors-archetype/pom.xml 
b/connectors/rocketmq-connect-connectors-archetype/pom.xml
new file mode 100644
index 00000000..6081f655
--- /dev/null
+++ b/connectors/rocketmq-connect-connectors-archetype/pom.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+
+  <groupId>org.apache.rocketmq</groupId>
+  <artifactId>rocketmq-connect-connectors-archetype</artifactId>
+  <version>1.0-SNAPSHOT</version>
+  <packaging>maven-archetype</packaging>
+
+  <name>rocketmq-connect-connectors-archetype</name>
+
+  <build>
+    <extensions>
+      <extension>
+        <groupId>org.apache.maven.archetype</groupId>
+        <artifactId>archetype-packaging</artifactId>
+        <version>3.2.1</version>
+      </extension>
+    </extensions>
+
+    <pluginManagement>
+      <plugins>
+        <plugin>
+          <artifactId>maven-archetype-plugin</artifactId>
+          <version>3.2.1</version>
+        </plugin>
+      </plugins>
+    </pluginManagement>
+  </build>
+
+  <licenses>
+    <license>
+      <name>The Apache Software License, Version 2.0</name>
+      <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+    </license>
+  </licenses>
+</project>
diff --git 
a/connectors/rocketmq-connect-connectors-archetype/src/main/resources/META-INF/maven/archetype-metadata.xml
 
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/META-INF/maven/archetype-metadata.xml
new file mode 100644
index 00000000..ba974e82
--- /dev/null
+++ 
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/META-INF/maven/archetype-metadata.xml
@@ -0,0 +1,66 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<archetype-descriptor 
xsi:schemaLocation="https://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.1.0
 http://maven.apache.org/xsd/archetype-descriptor-1.1.0.xsd"; 
name="rocketmq-connect-connectors-archetype"
+    
xmlns="https://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.1.0";
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";>
+
+  <requiredProperties>
+    <!--必填属性-->
+    <requiredProperty key="groupId">
+      <defaultValue>org.apache.rocketmq</defaultValue>
+    </requiredProperty>
+    <requiredProperty key="artifactId">
+      <defaultValue>rocketmq-connect-${dbNameToLowerCase}</defaultValue>
+    </requiredProperty>
+    <requiredProperty key="version">
+      <defaultValue>1.0.0-SNAPSHOT</defaultValue>
+    </requiredProperty>
+    <requiredProperty key="package">
+      
<defaultValue>org.apache.rocketmq.connect.${dbNameToLowerCase}</defaultValue>
+    </requiredProperty>
+
+    <!--   The database to connect   -->
+    <requiredProperty key="databaseName"/>
+
+    <requiredProperty key="dbNameToUpperCase" >
+      <defaultValue>${databaseName.toUpperCase()}</defaultValue>
+    </requiredProperty>
+
+    <requiredProperty key="dbNameToCamel" >
+      
<defaultValue>${databaseName.toLowerCase().substring(0,1).toUpperCase()}${databaseName.toLowerCase().substring(1)}</defaultValue>
+    </requiredProperty>
+
+    <requiredProperty key="dbNameToLowerCase" >
+      <defaultValue>${databaseName.toLowerCase()}</defaultValue>
+    </requiredProperty>
+  </requiredProperties>
+
+  <fileSets>
+    <fileSet filtered="true" encoding="UTF-8">
+      <directory/>
+      <includes>
+        <include>.reviewboardrc</include>
+        <include>README.md</include>
+      </includes>
+    </fileSet>
+    <fileSet encoding="UTF-8">
+      <directory/>
+      <includes>
+        <include>.gitignore</include>
+        <include>TODO.md</include>
+      </includes>
+    </fileSet>
+    <fileSet encoding="UTF-8">
+      <directory/>
+      <includes>
+        <include>.gitignore</include>
+      </includes>
+    </fileSet>
+    <fileSet filtered="true" packaged="true" encoding="UTF-8">
+      <directory>src/main/java</directory>
+    </fileSet>
+    <fileSet filtered="true" packaged="true" encoding="UTF-8">
+      <directory>src/test/java</directory>
+    </fileSet>
+  </fileSets>
+
+</archetype-descriptor>
diff --git 
a/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/README.md
 
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/README.md
new file mode 100644
index 00000000..2cd7f28b
--- /dev/null
+++ 
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/README.md
@@ -0,0 +1,55 @@
+## FIXME: fix document
+
+##### ${dbNameToCamel}SourceConnector fully-qualified name
+
+org.apache.rocketmq.connect.${dbNameToLowerCase}.source.${dbNameToCamel}SourceConnector
+
+**${dbNameToLowerCase}-source-connector** start
+
+```
+POST  
http://${runtime-ip}:${runtime-port}/connectors/${dbNameToLowerCase}SourceConnector
+{
+    
"connector.class":"org.apache.rocketmq.connect.${dbNameToLowerCase}.source.${dbNameToCamel}SourceConnector",
+    "${dbNameToLowerCase}host":"localhost",
+    "${dbNameToLowerCase}port":8123,
+    "database":"default",
+    "username":"default",
+    "password":"123456",
+    "table":"tableName",
+    "topic":"test${dbNameToCamel}Topic",
+    
"value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
+    
"key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
+}
+```
+
+##### ${dbNameToCamel}SinkConnector fully-qualified name
+
+org.apache.rocketmq.connect.${dbNameToLowerCase}.sink.${dbNameToCamel}SinkConnector
+
+**${dbNameToLowerCase}-sink-connector** start
+
+```
+POST  
http://${runtime-ip}:${runtime-port}/connectors/${dbNameToLowerCase}SinkConnector
+{
+    
"connector.class":"org.apache.rocketmq.connect.${dbNameToLowerCase}.sink.${dbNameToCamel}SinkConnector",
+    "${dbNameToLowerCase}host":"localhost",
+    "${dbNameToLowerCase}port":8123,
+    "database":"clickhouse",
+    "username":"default",
+    "password":"123456",
+    "connect.topicnames":"test${dbNameToCamel}Topic",
+    
"value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
+    
"key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
+}
+```
+
+##### parameter configuration
+
+| parameter                | effect                                            
| required          | default |
+|--------------------------|---------------------------------------------------|-------------------|---------|
+| ${dbNameToLowerCase}host | The Host of the ${dbNameToCamel} server           
| yes               | null    |
+| ${dbNameToLowerCase}port | The Port of the ${dbNameToCamel} server           
| yes               | null    |
+| database                 | The database to read or write                     
| yes               | null    |
+| table                    | The source table to read                          
| yes (source only) | null    |
+| topic                    | RocketMQ topic for source connector to write into 
| yes (source only) | null    |
+| connect.topicnames       | RocketMQ topic for sink connector to read from    
| yes (sink only)   | null    |
diff --git 
a/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/pom.xml
 
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/pom.xml
new file mode 100644
index 00000000..c948d03b
--- /dev/null
+++ 
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/pom.xml
@@ -0,0 +1,188 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more 
contributor
+       license agreements. See the NOTICE file distributed with this work for 
additional
+       information regarding copyright ownership. The ASF licenses this file to
+       You under the Apache License, Version 2.0 (the "License"); you may not 
use
+       this file except in compliance with the License. You may obtain a copy 
of
+       the License at http://www.apache.org/licenses/LICENSE-2.0 Unless 
required
+       by applicable law or agreed to in writing, software distributed under 
the
+       License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
CONDITIONS
+       OF ANY KIND, either express or implied. See the License for the specific
+       language governing permissions and limitations under the License. 
--><project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>${groupId}</groupId>
+    <artifactId>${artifactId}</artifactId>
+    <version>${version}</version>
+
+    <name>connect-${dbNameToLowerCase}</name>
+
+    <licenses>
+        <license>
+            <name>The Apache Software License, Version 2.0</name>
+            <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+        </license>
+    </licenses>
+
+    <issueManagement>
+        <system>jira</system>
+        <url>https://issues.apache.org/jira/browse/RocketMQ</url>
+    </issueManagement>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>versions-maven-plugin</artifactId>
+                <version>2.3</version>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>clirr-maven-plugin</artifactId>
+                <version>2.7</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.6.1</version>
+                <configuration>
+                    <source>${maven.compiler.source}</source>
+                    <target>${maven.compiler.target}</target>
+                    <compilerVersion>${maven.compiler.source}</compilerVersion>
+                    <showDeprecation>true</showDeprecation>
+                    <showWarnings>true</showWarnings>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.19.1</version>
+                <configuration>
+                    <argLine>-Xms512m -Xmx1024m</argLine>
+                    <forkMode>always</forkMode>
+                    <includes>
+                        <include>**/*Test.java</include>
+                    </includes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-site-plugin</artifactId>
+                <version>3.6</version>
+                <configuration>
+                    <locales>en_US</locales>
+                    <outputEncoding>UTF-8</outputEncoding>
+                    <inputEncoding>UTF-8</inputEncoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-source-plugin</artifactId>
+                <version>3.0.1</version>
+                <executions>
+                    <execution>
+                        <id>attach-sources</id>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <version>2.10.4</version>
+                <configuration>
+                    <charset>UTF-8</charset>
+                    <locale>en_US</locale>
+                    
<excludePackageNames>io.openmessaging.internal</excludePackageNames>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>aggregate</id>
+                        <goals>
+                            <goal>aggregate</goal>
+                        </goals>
+                        <phase>site</phase>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-resources-plugin</artifactId>
+                <version>3.0.2</version>
+                <configuration>
+                    <encoding>${project.build.sourceEncoding}</encoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>findbugs-maven-plugin</artifactId>
+                <version>3.0.4</version>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <version>0.12</version>
+                <configuration>
+                    <excludes>
+                        <exclude>README.md</exclude>
+                        <exclude>README-CN.md</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>3.0.0</version>
+                <configuration>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-connector</artifactId>
+            <version>0.1.4</version>
+            <scope>compile</scope>
+        </dependency>
+
+        <!-- FIXME: Write dependencies here-->
+
+        <dependency>
+            <groupId>org.lz4</groupId>
+            <artifactId>lz4-java</artifactId>
+            <version>1.8.0</version>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+            <version>1.2.83</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>RELEASE</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>1.7.7</version>
+        </dependency>
+    </dependencies>
+
+</project>
diff --git 
a/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/config/__dbNameToCamel__BaseConfig.java
 
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/config/__dbNameToCamel__BaseConfig.java
new file mode 100644
index 00000000..c13c241c
--- /dev/null
+++ 
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/config/__dbNameToCamel__BaseConfig.java
@@ -0,0 +1,131 @@
+#set( $symbol_pound = '#' )
+#set( $symbol_dollar = '$' )
+#set( $symbol_escape = '\' )
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${package}.config;
+
+import io.openmessaging.KeyValue;
+import java.lang.reflect.Method;
+
+public class ${dbNameToCamel}BaseConfig {
+
+    private String ${databaseName}Host;
+
+    private Integer ${databaseName}Port;
+
+    private String database;
+
+    private String userName;
+
+    private String passWord;
+
+    private String topic;
+
+    public String get${dbNameToCamel}Host() {
+        return ${databaseName}Host;
+    }
+
+    public void set${dbNameToCamel}Host(String ${databaseName}Host) {
+        this.${databaseName}Host = ${databaseName}Host;
+    }
+
+    public Integer get${dbNameToCamel}Port() {
+        return ${databaseName}Port;
+    }
+
+    public void set${dbNameToCamel}Port(Integer ${databaseName}Port) {
+        this.${databaseName}Port = ${databaseName}Port;
+    }
+
+    public String getUserName() {
+        return userName;
+    }
+
+    public void setUserName(String userName) {
+        this.userName = userName;
+    }
+
+    public String getPassWord() {
+        return passWord;
+    }
+
+    public void setPassWord(String passWord) {
+        this.passWord = passWord;
+    }
+
+    public String getDatabase() {
+        return database;
+    }
+
+    public void setDatabase(String database) {
+        this.database = database;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public void load(KeyValue props) {
+        properties2Object(props, this);
+    }
+
+    private void properties2Object(final KeyValue p, final Object object) {
+
+        Method[] methods = object.getClass().getMethods();
+        for (Method method : methods) {
+            String mn = method.getName();
+            if (mn.startsWith("set")) {
+                try {
+                    String tmp = mn.substring(3);
+                    String key = tmp.toLowerCase();
+
+                    String property = p.getString(key);
+                    if (property != null) {
+                        Class<?>[] pt = method.getParameterTypes();
+                        if (pt != null && pt.length > 0) {
+                            String cn = pt[0].getSimpleName();
+                            Object arg;
+                            if (cn.equals("int") || cn.equals("Integer")) {
+                                arg = Integer.parseInt(property);
+                            } else if (cn.equals("long") || cn.equals("Long")) 
{
+                                arg = Long.parseLong(property);
+                            } else if (cn.equals("double") || 
cn.equals("Double")) {
+                                arg = Double.parseDouble(property);
+                            } else if (cn.equals("boolean") || 
cn.equals("Boolean")) {
+                                arg = Boolean.parseBoolean(property);
+                            } else if (cn.equals("float") || 
cn.equals("Float")) {
+                                arg = Float.parseFloat(property);
+                            } else if (cn.equals("String")) {
+                                arg = property;
+                            } else {
+                                continue;
+                            }
+                            method.invoke(object, arg);
+                        }
+                    }
+                } catch (Throwable ignored) {
+                }
+            }
+        }
+    }
+}
diff --git 
a/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/config/__dbNameToCamel__Constants.java
 
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/config/__dbNameToCamel__Constants.java
new file mode 100644
index 00000000..0973299d
--- /dev/null
+++ 
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/config/__dbNameToCamel__Constants.java
@@ -0,0 +1,50 @@
+#set( $symbol_pound = '#' )
+#set( $symbol_dollar = '$' )
+#set( $symbol_escape = '\' )
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${package}.config;
+
+public class ${dbNameToCamel}Constants {
+    public static final String ${dbNameToUpperCase}_HOST = 
"${dbNameToLowerCase}host";
+
+    public static final String ${dbNameToUpperCase}_PORT = 
"${dbNameToLowerCase}port";
+
+    public static final String ${dbNameToUpperCase}_DATABASE = "database";
+
+    public static final String ${dbNameToUpperCase}_USERNAME = "username";
+
+    public static final String ${dbNameToUpperCase}_PASSWORD = "password";
+
+    public static final String ${dbNameToUpperCase}_TABLE = "table";
+
+    public static final String TOPIC = "topic";
+
+    public static final String ${dbNameToUpperCase}_OFFSET = "OFFSET";
+
+    public static final String ${dbNameToUpperCase}_PARTITION = 
"${dbNameToUpperCase}_PARTITION";
+
+    public static final Integer defaultTimeoutSeconds = 30;
+
+    public static final int MILLI_IN_A_SEC = 1000;
+
+    public static final Integer retryCountDefault = 3;
+
+    public static final int BATCH_SIZE = 2000;
+
+}
diff --git 
a/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/config/__dbNameToCamel__SinkConfig.java
 
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/config/__dbNameToCamel__SinkConfig.java
new file mode 100644
index 00000000..e0a2d632
--- /dev/null
+++ 
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/config/__dbNameToCamel__SinkConfig.java
@@ -0,0 +1,34 @@
+#set( $symbol_pound = '#' )
+#set( $symbol_dollar = '$' )
+#set( $symbol_escape = '\' )
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${package}.config;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class ${dbNameToCamel}SinkConfig extends ${dbNameToCamel}BaseConfig {
+    public static final Set<String> SINK_REQUEST_CONFIG = new 
HashSet<String>() {
+        {
+            add(${dbNameToCamel}Constants.${dbNameToUpperCase}_HOST);
+            add(${dbNameToCamel}Constants.${dbNameToUpperCase}_PORT);
+            // FIXME: add config you need
+        }
+    };
+}
diff --git 
a/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/config/__dbNameToCamel__SourceConfig.java
 
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/config/__dbNameToCamel__SourceConfig.java
new file mode 100644
index 00000000..4f02861e
--- /dev/null
+++ 
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/config/__dbNameToCamel__SourceConfig.java
@@ -0,0 +1,45 @@
+#set( $symbol_pound = '#' )
+#set( $symbol_dollar = '$' )
+#set( $symbol_escape = '\' )
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${package}.config;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class ${dbNameToCamel}SourceConfig extends ${dbNameToCamel}BaseConfig {
+
+    public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
+        {
+            add(${dbNameToCamel}Constants.${dbNameToUpperCase}_HOST);
+            add(${dbNameToCamel}Constants.${dbNameToUpperCase}_PORT);
+            add(${dbNameToCamel}Constants.${dbNameToUpperCase}_TABLE);
+            add(${dbNameToCamel}Constants.TOPIC);
+        }
+    };
+    private String table;
+
+    public String getTable() {
+        return table;
+    }
+
+    public void setTable(String table) {
+        this.table = table;
+    }
+}
diff --git 
a/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/helper/__dbNameToCamel__HelperClient.java
 
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/helper/__dbNameToCamel__HelperClient.java
new file mode 100644
index 00000000..38623c36
--- /dev/null
+++ 
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/helper/__dbNameToCamel__HelperClient.java
@@ -0,0 +1,72 @@
+#set( $symbol_pound = '#' )
+#set( $symbol_dollar = '$' )
+#set( $symbol_escape = '\' )
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${package}.helper;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import ${package}.config.${dbNameToCamel}Constants;
+import ${package}.config.${dbNameToCamel}BaseConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ${dbNameToCamel}HelperClient {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(${dbNameToCamel}HelperClient.class);
+
+    private ${dbNameToCamel}BaseConfig config;
+    private int timeout = ${dbNameToCamel}Constants.defaultTimeoutSeconds * 
${dbNameToCamel}Constants.MILLI_IN_A_SEC;
+    private int retry = ${dbNameToCamel}Constants.retryCountDefault;
+
+    public ${dbNameToCamel}HelperClient(${dbNameToCamel}BaseConfig config) {
+        this.config = config;
+        initConnection();
+    }
+
+    public void initConnection() {
+        // FIXME: Write your code here
+        throw new RuntimeException("Method not implemented");
+    }
+
+    public boolean ping() {
+        // FIXME: Write your code here
+        throw new RuntimeException("Method not implemented");
+    }
+
+    public List<?> query(long offset, int batchSize) {
+        // FIXME: Write your code here
+        throw new RuntimeException("Method not implemented");
+    }
+
+    public void batchInsert(List<?>) {
+        // FIXME: Write your code here
+        throw new RuntimeException("Method not implemented");
+    }
+
+    public boolean stop() {
+        // FIXME: Write your code here
+        throw new RuntimeException("Method not implemented");
+    }
+}
diff --git 
a/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/helper/__dbNameToCamel__Record.java
 
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/helper/__dbNameToCamel__Record.java
new file mode 100644
index 00000000..fa5c3106
--- /dev/null
+++ 
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/helper/__dbNameToCamel__Record.java
@@ -0,0 +1,25 @@
+#set( $symbol_pound = '#' )
+#set( $symbol_dollar = '$' )
+#set( $symbol_escape = '\' )
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${package}.helper;
+
+public class ${dbNameToCamel}Record {
+    // FIXME: Write your code here
+}
diff --git 
a/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/sink/__dbNameToCamel__SinkConnector.java
 
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/sink/__dbNameToCamel__SinkConnector.java
new file mode 100644
index 00000000..a9b23a89
--- /dev/null
+++ 
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/sink/__dbNameToCamel__SinkConnector.java
@@ -0,0 +1,60 @@
+#set( $symbol_pound = '#' )
+#set( $symbol_dollar = '$' )
+#set( $symbol_escape = '\' )
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${package}.sink;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.sink.SinkConnector;
+import java.util.ArrayList;
+import java.util.List;
+import ${package}.config.${dbNameToCamel}SinkConfig;
+
+public class ${dbNameToCamel}SinkConnector extends SinkConnector {
+
+    private KeyValue keyValue;
+
+    @Override public List<KeyValue> taskConfigs(int maxTasks) {
+        List<KeyValue> configs = new ArrayList<>();
+        for (int i = 0; i < maxTasks; i++) {
+            configs.add(this.keyValue);
+        }
+        return configs;
+    }
+
+    @Override public Class<? extends Task> taskClass() {
+        return ${dbNameToCamel}SinkTask.class;
+    }
+
+    @Override public void start(KeyValue value) {
+
+        for (String requestKey : 
${dbNameToCamel}SinkConfig.SINK_REQUEST_CONFIG) {
+            if (!value.containsKey(requestKey)) {
+                throw new RuntimeException("Request config key: " + 
requestKey);
+            }
+        }
+
+        this.keyValue = value;
+    }
+
+    @Override public void stop() {
+        this.keyValue = null;
+    }
+}
diff --git 
a/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/sink/__dbNameToCamel__SinkTask.java
 
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/sink/__dbNameToCamel__SinkTask.java
new file mode 100644
index 00000000..a4ee54a2
--- /dev/null
+++ 
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/sink/__dbNameToCamel__SinkTask.java
@@ -0,0 +1,69 @@
+#set( $symbol_pound = '#' )
+#set( $symbol_dollar = '$' )
+#set( $symbol_escape = '\' )
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${package}.sink;
+
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.Struct;
+import io.openmessaging.connector.api.errors.ConnectException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import ${package}.helper.${dbNameToCamel}HelperClient;
+import ${package}.config.${dbNameToCamel}SinkConfig;
+
+public class ${dbNameToCamel}SinkTask extends SinkTask {
+
+    public ${dbNameToCamel}SinkConfig config;
+
+    private ${dbNameToCamel}HelperClient helperClient;
+
+    @Override public void put(List<ConnectRecord> sinkRecords) throws 
ConnectException {
+        if (sinkRecords == null || sinkRecords.size() < 1) {
+            return;
+        }
+        for (ConnectRecord record : sinkRecords) {
+            String table = record.getSchema().getName();
+            final List<Field> fields = record.getSchema().getFields();
+            final Struct structData = (Struct) record.getData();
+
+            // FIXME: Write your code here
+            throw new RuntimeException("Method not implemented");
+        }
+    }
+
+    @Override public void start(KeyValue keyValue) {
+        this.config = new ${dbNameToCamel}SinkConfig();
+        this.config.load(keyValue);
+        this.helperClient = new ${dbNameToCamel}HelperClient(this.config);
+        if (!helperClient.ping()) {
+            throw new RuntimeException("Cannot connect to ${dbNameToLowerCase} 
server!");
+        }
+    }
+
+    @Override public void stop() {
+        this.helperClient.stop();
+    }
+}
diff --git 
a/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/source/__dbNameToCamel__SourceConnector.java
 
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/source/__dbNameToCamel__SourceConnector.java
new file mode 100644
index 00000000..d1a5daf5
--- /dev/null
+++ 
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/source/__dbNameToCamel__SourceConnector.java
@@ -0,0 +1,60 @@
+#set( $symbol_pound = '#' )
+#set( $symbol_dollar = '$' )
+#set( $symbol_escape = '\' )
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${package}.source;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
+import java.util.ArrayList;
+import java.util.List;
+import ${package}.config.${dbNameToCamel}SourceConfig;
+
+public class ${dbNameToCamel}SourceConnector extends SourceConnector {
+
+    private KeyValue keyValue;
+
+    @Override public List<KeyValue> taskConfigs(int maxTasks) {
+        List<KeyValue> configs = new ArrayList<>();
+        for (int i = 0; i < maxTasks; i++) {
+            configs.add(this.keyValue);
+        }
+        return configs;
+    }
+
+    @Override public Class<? extends Task> taskClass() {
+        return ${dbNameToCamel}SourceTask.class;
+    }
+
+    @Override public void start(KeyValue config) {
+
+        for (String requestKey : ${dbNameToCamel}SourceConfig.REQUEST_CONFIG) {
+            if (!config.containsKey(requestKey)) {
+                throw new RuntimeException("Request config key: " + 
requestKey);
+            }
+        }
+        this.keyValue = config;
+
+    }
+
+    @Override public void stop() {
+        this.keyValue = null;
+    }
+}
diff --git 
a/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/source/__dbNameToCamel__SourceTask.java
 
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/source/__dbNameToCamel__SourceTask.java
new file mode 100644
index 00000000..3a0e3cb4
--- /dev/null
+++ 
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/source/__dbNameToCamel__SourceTask.java
@@ -0,0 +1,167 @@
+#set( $symbol_pound = '#' )
+#set( $symbol_dollar = '$' )
+#set( $symbol_escape = '\' )
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${package}.source;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
+import io.openmessaging.internal.DefaultKeyValue;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import ${package}.helper.${dbNameToCamel}HelperClient;
+import ${package}.helper.${dbNameToCamel}Record;
+import ${package}.config.${dbNameToCamel}Constants;
+import ${package}.config.${dbNameToCamel}SourceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ${dbNameToCamel}SourceTask extends SourceTask {
+
+    private static final Logger log = 
LoggerFactory.getLogger(${dbNameToCamel}SourceTask.class);
+
+    private ${dbNameToCamel}SourceConfig config;
+
+    private ${dbNameToCamel}HelperClient helperClient;
+
+    @Override public List<ConnectRecord> poll() {
+        List<ConnectRecord> res = new ArrayList<>();
+        long offset = readRecordOffset();
+        List<${dbNameToCamel}Record> recordList = helperClient.query(offset, 
${dbNameToCamel}Constants.BATCH_SIZE);
+        res = recordList.stream().map(record -> 
${dbNameToLowerCase}Record2ConnectRecord(record, 
offset)).collect(Collectors.toList());
+        // FIXME: Write your code here
+        throw new RuntimeException("Method not implemented");
+
+        return res;
+    }
+
+    private long readRecordOffset() {
+        final RecordOffset positionInfo = 
this.sourceTaskContext.offsetStorageReader().readOffset(buildRecordPartition(config.getTable()));
+        if (positionInfo == null) {
+            return 0;
+        }
+        Object offset = positionInfo.getOffset().get(config.getTable() + "_" + 
${dbNameToCamel}Constants.${dbNameToUpperCase}_OFFSET);
+        return offset == null ? 0 : Long.parseLong(offset.toString());
+    }
+
+    private ConnectRecord 
${dbNameToLowerCase}Record2ConnectRecord(${dbNameToCamel}Record 
${dbNameToLowerCase}Record, long offset)
+        throws NoSuchFieldException, IllegalAccessException {
+        Schema schema = SchemaBuilder.struct().name(config.getTable()).build();
+        final List<Field> fields = buildFields(${dbNameToLowerCase}Record);
+        schema.setFields(fields);
+        final ConnectRecord connectRecord = new 
ConnectRecord(buildRecordPartition(config.getTable()),
+            buildRecordOffset(offset),
+            System.currentTimeMillis(),
+            schema,
+            this.buildPayLoad(fields, schema, ${dbNameToLowerCase}Record));
+        
connectRecord.setExtensions(this.buildExtensions(${dbNameToLowerCase}Record));
+        return connectRecord;
+    }
+
+    private List<Field> buildFields(
+        ${dbNameToCamel}Record ${dbNameToLowerCase}Record) throws 
NoSuchFieldException, IllegalAccessException {
+        List<Field> fields = new ArrayList<>();
+
+        // FIXME: Write your code here
+
+        return fields;
+    }
+
+    private RecordPartition buildRecordPartition(String partitionValue) {
+        Map<String, String> partitionMap = new HashMap<>();
+        
partitionMap.put(${dbNameToCamel}Constants.${dbNameToUpperCase}_PARTITION, 
partitionValue);
+        return new RecordPartition(partitionMap);
+    }
+
+    private Struct buildPayLoad(List<Field> fields, Schema schema, 
${dbNameToCamel}Record ${dbNameToLowerCase}Record) {
+        Struct payLoad = new Struct(schema);
+        for (int i = 0; i < fields.size(); i++) {
+            // FIXME: Write your code here
+        }
+        return payLoad;
+    }
+
+    private KeyValue buildExtensions(${dbNameToUpperCase}Record 
${dbNameToLowerCase}Record) {
+        KeyValue keyValue = new DefaultKeyValue();
+        String topicName = config.getTopic();
+        if (topicName == null || topicName.equals("")) {
+            String connectorName = this.sourceTaskContext.getConnectorName();
+            topicName = config.getTable() + "_" + connectorName;
+        }
+        keyValue.put(${dbNameToUpperCase}Constants.TOPIC, topicName);
+        return keyValue;
+    }
+
+    private RecordOffset buildRecordOffset(long offset) {
+        Map<String, Long> offsetMap = new HashMap<>();
+        offsetMap.put(config.getTable() + "_" + 
${dbNameToCamel}Constants.${dbNameToUpperCase}_OFFSET, offset);
+        return new RecordOffset(offsetMap);
+    }
+
+    private static Schema getSchema(Class clazz) {
+        if (clazz.equals(Byte.class)) {
+            return SchemaBuilder.int8().build();
+        } else if (clazz.equals(Short.class) || 
clazz.equals(UnsignedByte.class)) {
+            return SchemaBuilder.int16().build();
+        } else if (clazz.equals(Integer.class) || 
clazz.equals(UnsignedShort.class)) {
+            return SchemaBuilder.int32().build();
+        } else if (clazz.equals(Long.class) || 
clazz.equals(UnsignedInteger.class)) {
+            return SchemaBuilder.int64().build();
+        } else if (clazz.equals(Float.class)) {
+            return SchemaBuilder.float32().build();
+        } else if (clazz.equals(Double.class)) {
+            return SchemaBuilder.float64().build();
+        } else if (clazz.equals(String.class)) {
+            return SchemaBuilder.string().build();
+        } else if (clazz.equals(Date.class) || 
clazz.equals(LocalDateTime.class) || clazz.equals(LocalDate.class)) {
+            return SchemaBuilder.time().build();
+        } else if (clazz.equals(Timestamp.class)) {
+            return SchemaBuilder.timestamp().build();
+        } else if (clazz.equals(Boolean.class)) {
+            return SchemaBuilder.bool().build();
+        }
+        return SchemaBuilder.string().build();
+    }
+
+    @Override public void start(KeyValue keyValue) {
+        this.config = new ${dbNameToCamel}SourceConfig();
+        this.config.load(keyValue);
+        this.helperClient = new ${dbNameToCamel}HelperClient(this.config);
+        if (!helperClient.ping()) {
+            throw new RuntimeException("Cannot connect to ${dbNameToLowerCase} 
server!");
+        }
+    }
+
+    @Override public void stop() {
+        this.helperClient = null;
+    }
+}
diff --git 
a/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/test/java/sink/__dbNameToCamel__SinkTaskTest.java
 
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/test/java/sink/__dbNameToCamel__SinkTaskTest.java
new file mode 100644
index 00000000..4f6d7f59
--- /dev/null
+++ 
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/test/java/sink/__dbNameToCamel__SinkTaskTest.java
@@ -0,0 +1,86 @@
+#set( $symbol_pound = '#' )
+#set( $symbol_dollar = '$' )
+#set( $symbol_escape = '\' )
+package ${package}.sink;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
+import io.openmessaging.internal.DefaultKeyValue;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+class ${dbNameToCamel}SinkTaskTest {
+//
+//    private static final String host = "127.0.0.1";
+//    private static final String port = "8123";
+//    private static final String db = "default";
+//    private static final String username = "default";
+//    private static final String password = "123456";
+//
+//
+//
+//    public static void main(String[] args) {
+//        List<ConnectRecord> records = new ArrayList<>();
+//        // build schema
+//        Schema schema = SchemaBuilder.struct()
+//                .name("tableName")
+//                .field("c1",SchemaBuilder.string().build())
+//                .field("c2", SchemaBuilder.string().build())
+//                .build();
+//        // build record
+//        String param0 = "1001";
+//        Struct struct= new Struct(schema);
+//        struct.put("c1",param0);
+//        struct.put("c2",String.format("test-data-%s", param0));
+//
+//        Schema schema2 = SchemaBuilder.struct()
+//            .name("t1")
+//            .field("c1",SchemaBuilder.string().build())
+//            .field("c2", SchemaBuilder.string().build())
+//            .build();
+//        // build record
+//        Struct struct2= new Struct(schema2);
+//        struct.put("c1",param0);
+//        struct.put("c2",String.format("test-data-%s", param0));
+//
+//        for (int i = 0; i < 4; i++) {
+//            ConnectRecord record = new ConnectRecord(
+//                // offset partition
+//                // offset partition"
+//                new RecordPartition(new ConcurrentHashMap<>()),
+//                new RecordOffset(new HashMap<>()),
+//                System.currentTimeMillis(),
+//                schema,
+//                struct
+//            );
+//            records.add(record);
+//
+//            ConnectRecord record2 = new ConnectRecord(
+//                // offset partition
+//                // offset partition"
+//                new RecordPartition(new ConcurrentHashMap<>()),
+//                new RecordOffset(new HashMap<>()),
+//                System.currentTimeMillis(),
+//                schema2,
+//                struct
+//            );
+//            records.add(record2);
+//
+//        }
+//
+//        ${dbNameToCamel}SinkTask task = new ${dbNameToCamel}SinkTask();
+//        KeyValue config = new DefaultKeyValue();
+//        task.start(config);
+//        task.put(records);
+//
+//    }
+
+}
\ No newline at end of file
diff --git 
a/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/test/java/source/__dbNameToCamel__SourceTaskTest.java
 
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/test/java/source/__dbNameToCamel__SourceTaskTest.java
new file mode 100644
index 00000000..66ec034b
--- /dev/null
+++ 
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/test/java/source/__dbNameToCamel__SourceTaskTest.java
@@ -0,0 +1,28 @@
+#set( $symbol_pound = '#' )
+#set( $symbol_dollar = '$' )
+#set( $symbol_escape = '\' )
+package ${package}.source;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.internal.DefaultKeyValue;
+import java.util.List;
+import junit.framework.TestCase;
+import ${package}.config.${dbNameToCamel}Constants;
+
+import static java.lang.Thread.sleep;
+
+public class ${dbNameToCamel}SourceTaskTest {
+
+//    private static final String host = "127.0.0.1";
+//    private static final String port = "8123";
+//    private static final String db = "default";
+//    private static final String username = "default";
+//    private static final String password = "123456";
+//
+//    public void testPoll() {
+//    }
+//
+//    public void testStart() throws InterruptedException {
+//    }
+}
\ No newline at end of file
diff --git 
a/connectors/rocketmq-connect-connectors-archetype/src/test/resources/projects/basic/archetype.properties
 
b/connectors/rocketmq-connect-connectors-archetype/src/test/resources/projects/basic/archetype.properties
new file mode 100644
index 00000000..0d5342b4
--- /dev/null
+++ 
b/connectors/rocketmq-connect-connectors-archetype/src/test/resources/projects/basic/archetype.properties
@@ -0,0 +1,11 @@
+#Mon Aug 14 22:31:14 CST 2023
+package=it.pkg
+version=0.1-SNAPSHOT
+groupId=archetype.it
+artifactId=basic
+databaseName=test
+dbNameToUpperCase=TEST
+dbNameToCamel=Test
+dbNameToLowerCase=test
+archetype.filteredExtensions=java,sql,yml,xml,properties,factories,ftl,md
+
diff --git 
a/connectors/rocketmq-connect-connectors-archetype/src/test/resources/projects/basic/goal.txt
 
b/connectors/rocketmq-connect-connectors-archetype/src/test/resources/projects/basic/goal.txt
new file mode 100644
index 00000000..e69de29b

Reply via email to