This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b9a8abc [ISSUE #366] Support for sftp connector(sinlk & source) (#382)
5b9a8abc is described below

commit 5b9a8abcf5106475099205fcac7edec8460485b8
Author: fireround <[email protected]>
AuthorDate: Wed Nov 30 14:48:33 2022 +0800

    [ISSUE #366] Support for sftp connector(sinlk & source) (#382)
    
    * Support for sftp connector(sinlk & source)
    
    * update codestyle and copyright
    
    * substitute jsch with sshj
    
    * correct doc
    
    * correct doc
---
 connectors/rocketmq-connect-sftp/README.md         | 112 ++++++++++++
 ...ketMQ Connect SFTP \345\210\206\344\272\253.md" |  50 ++++++
 ...ketMQ Connect SFTP \345\256\236\346\210\230.md" | 145 +++++++++++++++
 ...75\277\347\224\250\345\234\272\346\231\257.png" | Bin 0 -> 91912 bytes
 connectors/rocketmq-connect-sftp/pom.xml           | 197 +++++++++++++++++++++
 .../rocketmq/connect/http/sink/SftpClient.java     |  74 ++++++++
 .../rocketmq/connect/http/sink/SftpConstant.java   |  42 +++++
 .../connect/http/sink/SftpSinkConnector.java       |  62 +++++++
 .../rocketmq/connect/http/sink/SftpSinkTask.java   | 100 +++++++++++
 .../connect/http/sink/SftpSourceConnector.java     |  58 ++++++
 .../rocketmq/connect/http/sink/SftpSourceTask.java | 155 ++++++++++++++++
 11 files changed, 995 insertions(+)

diff --git a/connectors/rocketmq-connect-sftp/README.md 
b/connectors/rocketmq-connect-sftp/README.md
new file mode 100644
index 00000000..0cb61dee
--- /dev/null
+++ b/connectors/rocketmq-connect-sftp/README.md
@@ -0,0 +1,112 @@
+# rocketmq-connect-sftp
+
+Plugin for Rocketmq Connect. Tansfer file based on SFTP.
+
+# How to use
+
+* start rocketmq nameserver
+
+```shell
+cd ${ROCKETMQ_HOME}
+nohup ./bin/mqnamesrv &
+```
+
+* start rocketmq broker
+
+```shell
+nohup ./bin/mqbroker -n localhost:9876 &
+```
+
+* build plugin
+
+```shell
+cd connectors/rocketmq-connect-sftp
+mvn clean install -Dmaven.test.skip=true
+```
+
+* create config file path/to/connect-standalone.conf same as 
distribution/conf/connect-standalone.conf
+* modify 
pluginPaths=path/to/rocketmq-connect-sftp-0.0.1-SNAPSHOT-jar-with-dependencies
+* start org.apache.rocketmq.connect.runtime.StandaloneConnectStartup
+
+```shell
+cd rocketmq-connect-runtime
+mvn clean install -Dmaven.test.skip=true
+```
+
+* start source connector
+
+```http request
+POST /connectors/SftpSourceConnector HTTP/1.1
+Host: localhost:8082
+Content-Type: application/json
+
+{
+  "connector.class": 
"org.apache.rocketmq.connect.http.sink.SftpSourceConnector",
+  "host": "127.0.0.1",
+  "port": 22,
+  "username": "wencheng",
+  "password": "",
+  "filePath": "/Users/wencheng/Documents/source.txt",
+  "connect.topicname": "sftpTopic",
+  
"value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
+       "fieldSeparator": "\\|",
+  "fieldSchema": "username|idCardNo|orderNo|orderAmount|trxDate|trxTime|profit"
+}
+```
+
+`warning:` make sure exist a file named "source.txt" on the sftp server.
+
+* start sink connector
+
+```http request
+POST /connectors/SftpSinkConnector HTTP/1.1
+Host: localhost:8082
+Content-Type: application/json
+
+{
+  "connector.class": "org.apache.rocketmq.connect.http.sink.SftpSinkConnector",
+  "host": "127.0.0.1",
+  "port": 22,
+  "username": "wencheng",
+  "password": "",
+  "filePath": "/Users/wencheng/Documents/sink.txt",
+  "connect.topicnames": "sftpTopic"
+}
+```
+
+## What we expected to see
+
+The file named sink.txt will be created, and the content of the "source.txt" 
will appears in this file.
+
+## Appendix: Connector Configuration
+
+### sftp-source-connector configuration
+
+| KEY               | TYPE   | REQUIRED | DESCRIPTION                          
                  | EXAMPLE             |
+| ----------------- | ------ | -------- | 
------------------------------------------------------ | ------------------- |
+| host              | String | Y        | SFTP host                            
                  | localhost           |
+| port              | int    | Y        | SFTP port                            
                  | 22                  |
+| username          | String | Y        | SFTP username                        
                  | wencheng            |
+| password          | String | Y        | SFTP password                        
                  |                     |
+| filePath          | String | Y        | The name of the file which will be 
transferred         | /path/to/source.txt |
+| fieldSchema       | String | Y        | the data schema of each line         
                  |                     |
+| fieldSeparator    | String | Y        | Symbol that separates each field     
                  |                     |
+| connect.topicname | String | Y        | The Message Queue topic which the 
data will be send to |                     |
+
+### sftp-sink-connector configuration
+
+| KEY                | TYPE   | REQUIRED | DESCRIPTION                         
                       | EXAMPLE           |
+| ------------------ | ------ | -------- | 
---------------------------------------------------------- | ----------------- |
+| host               | String | Y        | SFTP host                           
                       | localhost         |
+| port               | int    | Y        | SFTP port                           
                       | 22                |
+| username           | String | Y        | SFTP username                       
                       | wencheng          |
+| password           | String | Y        | SFTP password                       
                       |                   |
+| filePath           | String | Y        | The name of the file which will be 
transferred             | /path/to/sink.txt |
+| connect.topicnames | String | Y        | The Message Queue topic which the 
data will be pulled from |                   |
+| fieldSchema        | String | Y        | the data schema of each line        
                       |                   |
+| fieldSeparator     | String | Y        | Symbol that separates each field    
                       |                   |
+
+
+
+
+
diff --git "a/connectors/rocketmq-connect-sftp/doc/RocketMQ Connect SFTP 
\345\210\206\344\272\253.md" "b/connectors/rocketmq-connect-sftp/doc/RocketMQ 
Connect SFTP \345\210\206\344\272\253.md"
new file mode 100644
index 00000000..a9f753e7
--- /dev/null
+++ "b/connectors/rocketmq-connect-sftp/doc/RocketMQ Connect SFTP 
\345\210\206\344\272\253.md"  
@@ -0,0 +1,50 @@
+<!-- TOC -->
+
+- [connect **介绍**](#connect-介绍)
+- [SFTP 介绍](#sftp-介绍)
+- [connect-sftp 使用场景](#connect-sftp-使用场景)
+  - [使用 sftp-source-connector 同步对账文件](#使用-sftp-source-connector-同步对账文件)
+  - [使用 sftp-sink-connector 生成对账文件](#使用-sftp-sink-connector-生成对账文件)
+  - [使用 sftp-sink-connector 和 sftp-source-connector 同步 sftp 
文件](#使用-sftp-sink-connector-和-sftp-source-connector-同步-sftp-文件)
+- [connect-sftp 演示](#connect-sftp-演示)
+- [参考](#参考)
+
+<!-- TOC -->
+
+## connect **介绍**
+
+* 基于 openMessage connect 标准之上。实现了对 Connector 的管理(Rest API、Shell Command)
+* 实现了对 task 的任务调度
+* 通过插件机制和继承的扩展方式,使得扩展 connect 支持各种不同的应用协议变得比较轻量和容易。
+
+## SFTP 介绍
+
+全称 SSH File Transfer Protocol,运行在 SSH 协议之上(与 FTP 没有任何共同点),用于文件的传输。
+
+## connect-sftp 使用场景
+
+### 使用 sftp-source-connector 同步对账文件
+
+使用 sftp-source-connector 同步对账文件到 MQ,业务系统对接MQ,消费相应主题的消息进行业务处理。
+![sftp connect 使用场景](使用场景.png)
+
+优势:
+
+* 借助MQ,业务系统作为消费者可以轻松实现负载均衡
+* 业务系统不必关心数据的读取和转换过程。该过程的可靠性由 connect 保证。
+
+劣势:
+
+* 需要独立部署一个 connect 服务,并做相应的维护和配置。
+
+### 使用 sftp-sink-connector 生成对账文件
+
+### 使用 sftp-sink-connector 和 sftp-source-connector 同步 sftp 文件
+
+## connect-sftp 演示
+
+[RocketMQ Connect SFTP 实战.md](RocketMQ Connect SFTP 实战.md)
+
+## 参考
+
+[sftp.net](https://www.sftp.net/)
\ No newline at end of file
diff --git "a/connectors/rocketmq-connect-sftp/doc/RocketMQ Connect SFTP 
\345\256\236\346\210\230.md" "b/connectors/rocketmq-connect-sftp/doc/RocketMQ 
Connect SFTP \345\256\236\346\210\230.md"
new file mode 100644
index 00000000..e00e08ee
--- /dev/null
+++ "b/connectors/rocketmq-connect-sftp/doc/RocketMQ Connect SFTP 
\345\256\236\346\210\230.md"  
@@ -0,0 +1,145 @@
+# RocketMQ Connect SFTP 实战
+
+## 准备
+
+### 启动RocketMQ
+
+1. Linux/Unix/Mac
+2. 64bit JDK 1.8+;
+3. Maven 3.2.x或以上版本;
+4. 启动 [RocketMQ](https://rocketmq.apache.org/docs/quick-start/);
+
+
+
+**提示** : ${ROCKETMQ_HOME} 位置说明
+
+>bin-release.zip 版本:/rocketmq-all-4.9.4-bin-release
+>
+>source-release.zip 版本:/rocketmq-all-4.9.4-source-release/distribution
+
+
+### 启动Connect
+
+
+#### Connector插件编译
+
+RocketMQ Connector SFTP
+```
+$ cd rocketmq-connect/connectors/rocketmq-connect-sftp/
+$ mvn clean package -Dmaven.test.skip=true
+```
+
+将  RocketMQ Connector SFTP 编译好的包放入Runtime加载目录。命令如下:
+```
+mkdir -p /usr/local/connector-plugins
+cp target/rocketmq-connect-sftp-0.0.1-SNAPSHOT-jar-with-dependencies.jar 
/usr/local/connector-plugins
+```
+
+#### 启动Connect Runtime
+
+```
+cd  rocketmq-connect
+
+mvn -Prelease-connect -DskipTests clean install -U
+
+```
+
+修改配置`connect-standalone.conf` ,重点配置如下
+```
+$ cd 
distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT
+$ vim conf/connect-standalone.conf
+```
+
+```
+workerId=standalone-worker
+storePathRootDir=/tmp/storeRoot
+
+## Http port for user to access REST API
+httpPort=8082
+
+# Rocketmq namesrvAddr
+namesrvAddr=localhost:9876
+
+# RocketMQ acl
+aclEnable=false
+accessKey=rocketmq
+secretKey=12345678
+
+autoCreateGroupEnable=false
+clusterName="DefaultCluster"
+
+# 核心配置,将之前编译好包的插件目录配置在此;
+# Source or sink connector jar file dir,The default value is 
rocketmq-connect-sample
+pluginPaths=/usr/local/connector-plugins
+```
+
+
+```
+cd 
distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT
+
+sh bin/connect-standalone.sh -c conf/connect-standalone.conf &
+
+```
+
+### SFTP 服务器搭建
+
+使用 MAC OS 自带的 SFTP 服务器
+
+[允许远程电脑访问你的 Mac](https://support.apple.com/zh-cn/guide/mac-help/mchlp1066/mac)
+
+### 测试数据
+
+登陆 SFTP 服务器,将具有如何内容的 souce.txt 文件放入用户目录,例如:/path/to/
+
+```text
+张三|100000202211290001|20221129001|30000.00|2022-11-28|03:00:00|7.00
+李四|100000202211290002|20221129002|40000.00|2022-11-28|04:00:00|9.00
+赵五|100000202211290003|20221129003|50000.00|2022-11-28|05:00:00|12.00
+```
+
+## 启动Connector
+
+### 启动 SFTP source connector
+
+同步 SFTP 文件:source.txt
+作用:通过登陆 SFTP 服务器,解析文件并封装成通用的ConnectRecord对象,发送的RocketMQ Topic当中
+
+```shell
+curl -X POST --location "http://localhost:8082/connectors/SftpSourceConnector"; 
--http1.1 \
+    -H "Host: localhost:8082" \
+    -H "Content-Type: application/json" \
+    -d "{
+          \"connector.class\": 
\"org.apache.rocketmq.connect.http.sink.SftpSourceConnector\",
+          \"host\": \"127.0.0.1\",
+          \"port\": 22,
+          \"username\": \"wencheng\",
+          \"password\": \"1617\",
+          \"filePath\": \"/Users/wencheng/Documents/source.txt\",
+          \"connect.topicname\": \"sftpTopic\",
+          \"fieldSeparator\": \"|\",
+          \"fieldSchema\": 
\"username|idCardNo|orderNo|orderAmount|trxDate|trxTime|profit\"
+        }"
+```
+
+### 启动 SFTP sink connector
+
+作用:通过消费Topic中的数据,通过SFTP协议写入到目标文件当中
+
+```shell
+curl -X POST --location "http://localhost:8082/connectors/SftpSinkConnector"; 
--http1.1 \
+    -H "Host: localhost:8082" \
+    -H "Content-Type: application/json" \
+    -d "{
+          \"connector.class\": 
\"org.apache.rocketmq.connect.http.sink.SftpSinkConnector\",
+          \"host\": \"127.0.0.1\",
+          \"port\": 22,
+          \"username\": \"wencheng\",
+          \"password\": \"1617\",
+          \"filePath\": \"/Users/wencheng/Documents/sink.txt\",
+          \"connect.topicnames\": \"sftpTopic\",
+          \"fieldSeparator\": \"|\",
+          \"fieldSchema\": 
\"username|idCardNo|orderNo|orderAmount|trxDate|trxTime|profit\"
+        }"
+```
+
+****
\ No newline at end of file
diff --git 
"a/connectors/rocketmq-connect-sftp/doc/\344\275\277\347\224\250\345\234\272\346\231\257.png"
 
"b/connectors/rocketmq-connect-sftp/doc/\344\275\277\347\224\250\345\234\272\346\231\257.png"
new file mode 100644
index 00000000..0c27d747
Binary files /dev/null and 
"b/connectors/rocketmq-connect-sftp/doc/\344\275\277\347\224\250\345\234\272\346\231\257.png"
 differ
diff --git a/connectors/rocketmq-connect-sftp/pom.xml 
b/connectors/rocketmq-connect-sftp/pom.xml
new file mode 100644
index 00000000..06d0094d
--- /dev/null
+++ b/connectors/rocketmq-connect-sftp/pom.xml
@@ -0,0 +1,197 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.apache.rocketmq</groupId>
+    <artifactId>rocketmq-connect-sftp</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+
+    <name>rocketmq-connect-sftp</name>
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <slf4j.version>1.7.7</slf4j.version>
+        <logback.version>1.2.9</logback.version>
+        
<openmessaging-connector.version>0.1.4</openmessaging-connector.version>
+        <commons-lang3.version>3.12.0</commons-lang3.version>
+        <fastjson.version>1.2.83</fastjson.version>
+        <sshj.version>0.32.0</sshj.version>
+        <junit.version>4.13.1</junit.version>
+        <assertj.version>2.6.0</assertj.version>
+        <mockito.version>2.6.3</mockito.version>
+    </properties>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>versions-maven-plugin</artifactId>
+                <version>2.3</version>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>clirr-maven-plugin</artifactId>
+                <version>2.7</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <configuration>
+                    
<outputDirectory>${project.build.directory}/lib</outputDirectory>
+                    <excludeTransitive>false</excludeTransitive>
+                    <stripVersion>true</stripVersion>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.6.1</version>
+                <configuration>
+                    <source>${maven.compiler.source}</source>
+                    <target>${maven.compiler.target}</target>
+                    <compilerVersion>${maven.compiler.source}</compilerVersion>
+                    <showDeprecation>true</showDeprecation>
+                    <showWarnings>true</showWarnings>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.19.1</version>
+                <configuration>
+                    <argLine>-Xms512m -Xmx1024m</argLine>
+                    <forkMode>always</forkMode>
+                    <includes>
+                        <include>**/*Test.java</include>
+                    </includes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-site-plugin</artifactId>
+                <version>3.6</version>
+                <configuration>
+                    <locales>en_US</locales>
+                    <outputEncoding>UTF-8</outputEncoding>
+                    <inputEncoding>UTF-8</inputEncoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-source-plugin</artifactId>
+                <version>3.0.1</version>
+                <executions>
+                    <execution>
+                        <id>attach-sources</id>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <version>2.10.4</version>
+                <configuration>
+                    <charset>UTF-8</charset>
+                    <locale>en_US</locale>
+                    
<excludePackageNames>io.openmessaging.internal</excludePackageNames>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>aggregate</id>
+                        <goals>
+                            <goal>aggregate</goal>
+                        </goals>
+                        <phase>site</phase>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-resources-plugin</artifactId>
+                <version>3.0.2</version>
+                <configuration>
+                    <encoding>${project.build.sourceEncoding}</encoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>findbugs-maven-plugin</artifactId>
+                <version>3.0.4</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>3.0.0</version>
+                <configuration>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+    <dependencies>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>${slf4j.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <version>${logback.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+            <version>${logback.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-connector</artifactId>
+            <version>${openmessaging-connector.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+            <version>${fastjson.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>${commons-lang3.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.hierynomus</groupId>
+            <artifactId>sshj</artifactId>
+            <version>${sshj.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>${junit.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>${mockito.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <version>${assertj.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git 
a/connectors/rocketmq-connect-sftp/src/main/java/org/apache/rocketmq/connect/http/sink/SftpClient.java
 
b/connectors/rocketmq-connect-sftp/src/main/java/org/apache/rocketmq/connect/http/sink/SftpClient.java
new file mode 100644
index 00000000..03785683
--- /dev/null
+++ 
b/connectors/rocketmq-connect-sftp/src/main/java/org/apache/rocketmq/connect/http/sink/SftpClient.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.http.sink;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Set;
+import net.schmizz.sshj.SSHClient;
+import net.schmizz.sshj.sftp.OpenMode;
+import net.schmizz.sshj.sftp.RemoteFile;
+import net.schmizz.sshj.sftp.SFTPClient;
+import net.schmizz.sshj.transport.TransportException;
+import net.schmizz.sshj.transport.verification.PromiscuousVerifier;
+import net.schmizz.sshj.userauth.UserAuthException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SftpClient implements Closeable {
+
+    private static final Logger log = 
LoggerFactory.getLogger(SftpClient.class);
+
+    private SSHClient sshClient;
+
+    private SFTPClient internalSFTPClient;
+
+    public SftpClient(String host, int port, String username, String password) 
{
+        sshClient = new SSHClient();
+        sshClient.addHostKeyVerifier(new PromiscuousVerifier());
+        try {
+            sshClient.connect(host, port);
+            sshClient.authPassword(username, password);
+            internalSFTPClient = sshClient.newSFTPClient();
+        } catch (UserAuthException e) {
+            log.error(e.getMessage(), e);
+        } catch (TransportException e) {
+            log.error(e.getMessage(), e);
+        } catch (IOException e) {
+            log.error(e.getMessage(), e);
+        }
+    }
+
+    public RemoteFile open(String filename) throws IOException {
+        return internalSFTPClient.getSFTPEngine().open(filename);
+    }
+
+    public RemoteFile open(String filename, Set<OpenMode> modes) throws 
IOException {
+        return internalSFTPClient.getSFTPEngine().open(filename, modes);
+    }
+
+    @Override
+    public void close() {
+        try {
+            internalSFTPClient.close();
+            sshClient.close();
+        } catch (IOException e) {
+            log.error(e.getMessage(), e);
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/connectors/rocketmq-connect-sftp/src/main/java/org/apache/rocketmq/connect/http/sink/SftpConstant.java
 
b/connectors/rocketmq-connect-sftp/src/main/java/org/apache/rocketmq/connect/http/sink/SftpConstant.java
new file mode 100644
index 00000000..eb47cf50
--- /dev/null
+++ 
b/connectors/rocketmq-connect-sftp/src/main/java/org/apache/rocketmq/connect/http/sink/SftpConstant.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.http.sink;
+
+public class SftpConstant {
+
+    public static final String LOGGER_NAME = "SftpConnector";
+
+    public static final String SFTP_HOST_KEY = "host";
+
+    public static final String SFTP_PORT_KEY = "port";
+
+    public static final String SFTP_USERNAME_KEY = "username";
+
+    public static final String SFTP_PASSWORD_KEY = "password";
+
+    public static final String SFTP_PATH_KEY = "filePath";
+
+    public static final String SFTP_FIELD_SEPARATOR = "fieldSeparator";
+
+    public static final String SFTP_FIELD_SCHEMA = "fieldSchema";
+
+    public static final String RECORD_PARTITION_STORAGE_KEY = "partition";
+
+    public static final String RECORD_OFFSET_STORAGE_KEY = "offset";
+
+}
diff --git 
a/connectors/rocketmq-connect-sftp/src/main/java/org/apache/rocketmq/connect/http/sink/SftpSinkConnector.java
 
b/connectors/rocketmq-connect-sftp/src/main/java/org/apache/rocketmq/connect/http/sink/SftpSinkConnector.java
new file mode 100644
index 00000000..24f6e5a8
--- /dev/null
+++ 
b/connectors/rocketmq-connect-sftp/src/main/java/org/apache/rocketmq/connect/http/sink/SftpSinkConnector.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.http.sink;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.sink.SinkConnector;
+import java.util.ArrayList;
+import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SftpSinkConnector extends SinkConnector {
+
+    private Logger log = LoggerFactory.getLogger(SftpConstant.LOGGER_NAME);
+
+    private KeyValue config;
+
+    @Override
+    public List<KeyValue> taskConfigs(int maxTasks) {
+        List<KeyValue> taskConfigs = new ArrayList<>();
+        if (config != null) {
+            taskConfigs.add(config);
+        }
+        return taskConfigs;
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return SftpSinkTask.class;
+    }
+
+    @Override
+    public void validate(KeyValue config) {
+    }
+
+    @Override
+    public void start(KeyValue config) {
+        log.info("Sftp connector started");
+        this.config = config;
+    }
+
+    @Override
+    public void stop() {
+        log.info("Sftp connector stopped");
+    }
+}
diff --git 
a/connectors/rocketmq-connect-sftp/src/main/java/org/apache/rocketmq/connect/http/sink/SftpSinkTask.java
 
b/connectors/rocketmq-connect-sftp/src/main/java/org/apache/rocketmq/connect/http/sink/SftpSinkTask.java
new file mode 100644
index 00000000..78bbab31
--- /dev/null
+++ 
b/connectors/rocketmq-connect-sftp/src/main/java/org/apache/rocketmq/connect/http/sink/SftpSinkTask.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.http.sink;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.errors.ConnectException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.regex.Pattern;
+import net.schmizz.sshj.sftp.OpenMode;
+import net.schmizz.sshj.sftp.RemoteFile;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.rocketmq.connect.http.sink.SftpConstant.SFTP_FIELD_SCHEMA;
+import static 
org.apache.rocketmq.connect.http.sink.SftpConstant.SFTP_FIELD_SEPARATOR;
+import static org.apache.rocketmq.connect.http.sink.SftpConstant.SFTP_HOST_KEY;
+import static 
org.apache.rocketmq.connect.http.sink.SftpConstant.SFTP_PASSWORD_KEY;
+import static org.apache.rocketmq.connect.http.sink.SftpConstant.SFTP_PATH_KEY;
+import static org.apache.rocketmq.connect.http.sink.SftpConstant.SFTP_PORT_KEY;
+import static 
org.apache.rocketmq.connect.http.sink.SftpConstant.SFTP_USERNAME_KEY;
+
+public class SftpSinkTask extends SinkTask {
+    private static final Logger log = 
LoggerFactory.getLogger(SftpSinkTask.class);
+
+    private SftpClient sftpClient;
+
+    private String filePath;
+
+    private String fieldSeparator;
+
+    private String[] fieldSchema;
+
+    @Override public void validate(KeyValue config) {
+        if (StringUtils.isBlank(config.getString(SFTP_HOST_KEY))
+            || StringUtils.isBlank(config.getString(SFTP_PORT_KEY))
+            || StringUtils.isBlank(config.getString(SFTP_USERNAME_KEY))
+            || StringUtils.isBlank(config.getString(SFTP_PASSWORD_KEY))
+            || StringUtils.isBlank(config.getString(SFTP_PATH_KEY))) {
+            throw new RuntimeException("missing required config");
+        }
+    }
+
+    @Override public void start(KeyValue config) {
+        String host = config.getString(SFTP_HOST_KEY);
+        int port = config.getInt(SFTP_PORT_KEY);
+        String username = config.getString(SFTP_USERNAME_KEY);
+        String password = config.getString(SFTP_PASSWORD_KEY);
+        this.filePath = config.getString(SFTP_PATH_KEY);
+        this.sftpClient = new SftpClient(host, port, username, password);
+        fieldSeparator = config.getString(SFTP_FIELD_SEPARATOR);
+        String fieldSchemaStr = config.getString(SFTP_FIELD_SCHEMA);
+        fieldSchema = fieldSchemaStr.split(Pattern.quote(fieldSeparator));
+    }
+
+    @Override public void put(List<ConnectRecord> sinkRecords) throws 
ConnectException {
+        try (RemoteFile remoteFile = sftpClient.open(filePath, 
EnumSet.of(OpenMode.READ, OpenMode.CREAT, OpenMode.WRITE, OpenMode.APPEND));
+             OutputStream outputStream = remoteFile.new 
RemoteFileOutputStream()) {
+            for (ConnectRecord connectRecord : sinkRecords) {
+                String str = (String) connectRecord.getData();
+                JSONObject jsonObject = JSON.parseObject(str);
+                StringBuilder lineBuilder = new StringBuilder();
+                for (int i = 0; i < fieldSchema.length; i++) {
+                    
lineBuilder.append(jsonObject.getString(fieldSchema[i])).append(fieldSeparator);
+                }
+                lineBuilder.append(System.lineSeparator());
+                byte[] line = lineBuilder.toString().getBytes();
+                outputStream.write(line, 0, line.length);
+            }
+        } catch (IOException e) {
+            log.error("sink task ioexception", e);
+        }
+    }
+
+    @Override public void stop() {
+        sftpClient.close();
+    }
+}
diff --git 
a/connectors/rocketmq-connect-sftp/src/main/java/org/apache/rocketmq/connect/http/sink/SftpSourceConnector.java
 
b/connectors/rocketmq-connect-sftp/src/main/java/org/apache/rocketmq/connect/http/sink/SftpSourceConnector.java
new file mode 100644
index 00000000..82f1ed77
--- /dev/null
+++ 
b/connectors/rocketmq-connect-sftp/src/main/java/org/apache/rocketmq/connect/http/sink/SftpSourceConnector.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.http.sink;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
+import java.util.ArrayList;
+import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SftpSourceConnector extends SourceConnector {
+
+    private Logger log = LoggerFactory.getLogger(SftpConstant.LOGGER_NAME);
+
+    private KeyValue config;
+
+    @Override
+    public List<KeyValue> taskConfigs(int i) {
+        List<KeyValue> taskConfigs = new ArrayList<>();
+        if (config != null) {
+            taskConfigs.add(config);
+        }
+        return taskConfigs;
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return SftpSourceTask.class;
+    }
+
+    @Override
+    public void start(KeyValue config) {
+        log.info("Sftp connector started");
+        this.config = config;
+    }
+
+    @Override
+    public void stop() {
+        log.info("Sftp connector stopped");
+    }
+}
diff --git 
a/connectors/rocketmq-connect-sftp/src/main/java/org/apache/rocketmq/connect/http/sink/SftpSourceTask.java
 
b/connectors/rocketmq-connect-sftp/src/main/java/org/apache/rocketmq/connect/http/sink/SftpSourceTask.java
new file mode 100644
index 00000000..466e5a0b
--- /dev/null
+++ 
b/connectors/rocketmq-connect-sftp/src/main/java/org/apache/rocketmq/connect/http/sink/SftpSourceTask.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.http.sink;
+
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.component.task.source.SourceTaskContext;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.file.FileSystemException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+import net.schmizz.sshj.sftp.RemoteFile;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.rocketmq.connect.http.sink.SftpConstant.RECORD_OFFSET_STORAGE_KEY;
+import static 
org.apache.rocketmq.connect.http.sink.SftpConstant.RECORD_PARTITION_STORAGE_KEY;
+import static 
org.apache.rocketmq.connect.http.sink.SftpConstant.SFTP_FIELD_SCHEMA;
+import static 
org.apache.rocketmq.connect.http.sink.SftpConstant.SFTP_FIELD_SEPARATOR;
+import static org.apache.rocketmq.connect.http.sink.SftpConstant.SFTP_HOST_KEY;
+import static 
org.apache.rocketmq.connect.http.sink.SftpConstant.SFTP_PASSWORD_KEY;
+import static org.apache.rocketmq.connect.http.sink.SftpConstant.SFTP_PATH_KEY;
+import static org.apache.rocketmq.connect.http.sink.SftpConstant.SFTP_PORT_KEY;
+import static 
org.apache.rocketmq.connect.http.sink.SftpConstant.SFTP_USERNAME_KEY;
+
+public class SftpSourceTask extends SourceTask {
+
+    private final Logger log = 
LoggerFactory.getLogger(SftpConstant.LOGGER_NAME);
+
+    private SftpClient sftpClient;
+
+    private String filePath;
+
+    private String fieldSeparator;
+
+    private String[] fieldSchema;
+
+    private static final int MAX_NUMBER_SEND_CONNECT_RECORD_EACH_TIME = 2000;
+
+    @Override public void init(SourceTaskContext sourceTaskContext) {
+        super.init(sourceTaskContext);
+    }
+
+    @Override public void validate(KeyValue config) {
+        if (StringUtils.isBlank(config.getString(SFTP_HOST_KEY))
+            || StringUtils.isBlank(config.getString(SFTP_PORT_KEY))
+            || StringUtils.isBlank(config.getString(SFTP_USERNAME_KEY))
+            || StringUtils.isBlank(config.getString(SFTP_PASSWORD_KEY))
+            || StringUtils.isBlank(config.getString(SFTP_PATH_KEY))) {
+            throw new RuntimeException("missing required config");
+        }
+    }
+
+    @Override public void start(KeyValue config) {
+        String host = config.getString(SFTP_HOST_KEY);
+        int port = config.getInt(SFTP_PORT_KEY);
+        String username = config.getString(SFTP_USERNAME_KEY);
+        String password = config.getString(SFTP_PASSWORD_KEY);
+        this.filePath = config.getString(SFTP_PATH_KEY);
+        this.sftpClient = new SftpClient(host, port, username, password);
+        fieldSeparator = config.getString(SFTP_FIELD_SEPARATOR);
+        String fieldSchemaStr = config.getString(SFTP_FIELD_SCHEMA);
+        fieldSchema = fieldSchemaStr.split(Pattern.quote(fieldSeparator));
+    }
+
+    @Override public void stop() {
+        sftpClient.close();
+    }
+
+    @Override public List<ConnectRecord> poll() {
+        int offset = readRecordOffset();
+        try (RemoteFile remoteFile = sftpClient.open(filePath);
+             BufferedReader reader = new BufferedReader(new 
InputStreamReader(remoteFile.new RemoteFileInputStream(offset)))) {
+            List<ConnectRecord> records = new ArrayList<>();
+            String line;
+            ConnectRecord connectRecord;
+
+            while ((line = reader.readLine()) != null) {
+                offset = offset + line.getBytes().length + 1;
+
+                // do not send empty string to mq
+                if (!StringUtils.isEmpty(line)) {
+                    String[] data = line.split(Pattern.quote(fieldSeparator));
+                    JSONObject jsonObject = new JSONObject();
+                    for (int i = 0; i < fieldSchema.length; i++) {
+                        jsonObject.put(fieldSchema[i], data[i]);
+                    }
+                    connectRecord = new 
ConnectRecord(buildRecordPartition(filePath), buildRecordOffset(offset), 
System.currentTimeMillis());
+
+                    connectRecord.setData(jsonObject.toString());
+                    records.add(connectRecord);
+                    if (records.size() > 
MAX_NUMBER_SEND_CONNECT_RECORD_EACH_TIME) {
+                        break;
+                    }
+                }
+            }
+            return records;
+        } catch (FileSystemException e) {
+            log.error("File system error", e);
+        } catch (IOException e) {
+            log.error("SFTP IOException", e);
+        }
+        return null;
+    }
+
+    private RecordOffset buildRecordOffset(int offset) {
+        Map<String, Integer> offsetMap = new HashMap<>();
+        offsetMap.put(RECORD_OFFSET_STORAGE_KEY, offset);
+        return new RecordOffset(offsetMap);
+    }
+
+    private RecordPartition buildRecordPartition(String partitionValue) {
+        Map<String, String> partitionMap = new HashMap<>();
+        partitionMap.put(RECORD_PARTITION_STORAGE_KEY, partitionValue);
+        return new RecordPartition(partitionMap);
+    }
+
+    private int readRecordOffset() {
+        RecordOffset positionInfo = 
this.sourceTaskContext.offsetStorageReader().readOffset(buildRecordPartition(filePath));
+        if (positionInfo == null) {
+            return 0;
+        }
+        Object offset = 
positionInfo.getOffset().get(RECORD_OFFSET_STORAGE_KEY);
+        if (offset == null) {
+            return 0;
+        } else {
+            return (int) offset;
+        }
+    }
+}


Reply via email to