This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 560ae8e1 [ISSUE #346] Add elasticsearch source connector (#347)
560ae8e1 is described below

commit 560ae8e145d7de686467a54cb374e6c4b4e24803
Author: Oliver <[email protected]>
AuthorDate: Mon Oct 31 14:16:59 2022 +0800

    [ISSUE #346] Add elasticsearch source connector (#347)
---
 .../rocketmq-connect-elasticsearch/README.md       |  31 +++
 connectors/rocketmq-connect-elasticsearch/pom.xml  | 231 +++++++++++++++++++++
 .../elasticsearch/config/ElasticsearchConfig.java  | 121 +++++++++++
 .../config/ElasticsearchConstant.java              |  41 ++++
 .../connector/ElasticsearchSourceConnector.java    |  83 ++++++++
 .../connector/ElasticsearchSourceTask.java         | 135 ++++++++++++
 .../replicator/source/ElasticsearchQuery.java      | 154 ++++++++++++++
 .../replicator/source/ElasticsearchReplicator.java |  64 ++++++
 .../config/ElasticsearchConfigTest.java            |  47 +++++
 .../ElasticsearchSourceConnectorTest.java          |  44 ++++
 .../replicator/source/ElasticsearchQueryTest.java  |  48 +++++
 11 files changed, 999 insertions(+)

diff --git a/connectors/rocketmq-connect-elasticsearch/README.md 
b/connectors/rocketmq-connect-elasticsearch/README.md
new file mode 100644
index 00000000..c5c0d89e
--- /dev/null
+++ b/connectors/rocketmq-connect-elasticsearch/README.md
@@ -0,0 +1,31 @@
+##### ElasticsearchSourceConnector fully-qualified name
+org.apache.rocketmq.connect.elasticsearch.connector.ElasticsearchSourceConnector
+
+**elasticsearch-source-connector** start
+
+```
+POST  
http://${runtime-ip}:${runtime-port}/connectors/elasticsearchSourceConnector
+{
+    
"connector.class":"org.apache.rocketmq.connect.elasticsearch.connector.ElasticsearchSourceConnector",
+    "elasticsearchHost":"localhost",
+    "elasticsearchPort":9200,
+    "index":{
+        "aolifu_connect": {
+            "primaryShards":1,
+            "id":1
+        }
+    },
+    "max.tasks":1,
+    "connect.topicname":"configInfo",
+    
"value.converter":"org.apache.rocketmq.connect.runtime.converter.record.StringConverter",
+    
"key.converter":"org.apache.rocketmq.connect.runtime.converter.record.StringConverter"
+}
+```
+
+##### parameter configuration
+
+parameter | effect                                                             
                                                                                
       | required |default
+---|-----------------------------------------------------------------------------------------------------------------------------------------------------------|----------|
 ---
+elasticsearchHost | The Host of the Elasticsearch server                       
                                                                                
               | yes      | null
+elasticsearchPort | The Port of the Elasticsearch server                       
                                                                                
               | yes      |  null
+index| The info of the index                                                   
                                                                                
  | yes      | null
\ No newline at end of file
diff --git a/connectors/rocketmq-connect-elasticsearch/pom.xml 
b/connectors/rocketmq-connect-elasticsearch/pom.xml
new file mode 100644
index 00000000..557a7281
--- /dev/null
+++ b/connectors/rocketmq-connect-elasticsearch/pom.xml
@@ -0,0 +1,231 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.apache.rocketmq</groupId>
+    <artifactId>rocketmq-connect-elasticsearch</artifactId>
+    <version>1.0.0</version>
+
+    <name>connect-elasticsearch</name>
+    
<url>https://github.com/apache/rocketmq-connect/tree/master/connectors/rocketmq-connect-elasticsearch</url>
+
+    <licenses>
+        <license>
+            <name>The Apache Software License, Version 2.0</name>
+            <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+        </license>
+    </licenses>
+
+    <issueManagement>
+        <system>jira</system>
+        <url>https://issues.apache.org/jira/browse/RocketMQ</url>
+    </issueManagement>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+
+        <!-- Compiler settings properties -->
+        <maven.compiler.source>1.8</maven.compiler.source>
+        <maven.compiler.target>1.8</maven.compiler.target>
+    </properties>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>versions-maven-plugin</artifactId>
+                <version>2.3</version>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>clirr-maven-plugin</artifactId>
+                <version>2.7</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.6.1</version>
+                <configuration>
+                    <source>${maven.compiler.source}</source>
+                    <target>${maven.compiler.target}</target>
+                    <compilerVersion>${maven.compiler.source}</compilerVersion>
+                    <showDeprecation>true</showDeprecation>
+                    <showWarnings>true</showWarnings>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.19.1</version>
+                <configuration>
+                    <argLine>-Xms512m -Xmx1024m</argLine>
+                    <forkMode>always</forkMode>
+                    <includes>
+                        <include>**/*Test.java</include>
+                    </includes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-site-plugin</artifactId>
+                <version>3.6</version>
+                <configuration>
+                    <locales>en_US</locales>
+                    <outputEncoding>UTF-8</outputEncoding>
+                    <inputEncoding>UTF-8</inputEncoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-source-plugin</artifactId>
+                <version>3.0.1</version>
+                <executions>
+                    <execution>
+                        <id>attach-sources</id>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <version>2.10.4</version>
+                <configuration>
+                    <charset>UTF-8</charset>
+                    <locale>en_US</locale>
+                    
<excludePackageNames>io.openmessaging.internal</excludePackageNames>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>aggregate</id>
+                        <goals>
+                            <goal>aggregate</goal>
+                        </goals>
+                        <phase>site</phase>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-resources-plugin</artifactId>
+                <version>3.0.2</version>
+                <configuration>
+                    <encoding>${project.build.sourceEncoding}</encoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>findbugs-maven-plugin</artifactId>
+                <version>3.0.4</version>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <version>0.12</version>
+                <configuration>
+                    <excludes>
+                        <exclude>README.md</exclude>
+                        <exclude>README-CN.md</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>3.0.0</version>
+                <configuration>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+    <dependencies>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.13.2</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <version>3.23.1</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>4.8.0</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-connector</artifactId>
+            <version>0.1.4</version>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+            <version>1.2.83</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>1.7.7</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <version>1.2.0</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+            <version>1.2.9</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-cli</groupId>
+            <artifactId>commons-cli</artifactId>
+            <version>1.2</version>
+        </dependency>
+        <dependency>
+            <groupId>org.elasticsearch</groupId>
+            <artifactId>elasticsearch</artifactId>
+            <version>7.6.2</version>
+        </dependency>
+        <dependency>
+            <groupId>org.elasticsearch.client</groupId>
+            <artifactId>elasticsearch-rest-high-level-client</artifactId>
+            <version>7.6.2</version>
+        </dependency>
+
+    </dependencies>
+
+</project>
diff --git 
a/connectors/rocketmq-connect-elasticsearch/src/main/java/org/apache/rocketmq/connect/elasticsearch/config/ElasticsearchConfig.java
 
b/connectors/rocketmq-connect-elasticsearch/src/main/java/org/apache/rocketmq/connect/elasticsearch/config/ElasticsearchConfig.java
new file mode 100644
index 00000000..0f06b162
--- /dev/null
+++ 
b/connectors/rocketmq-connect-elasticsearch/src/main/java/org/apache/rocketmq/connect/elasticsearch/config/ElasticsearchConfig.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.elasticsearch.config;
+
+import io.openmessaging.KeyValue;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class ElasticsearchConfig {
+
+    public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
+        {
+            add(ElasticsearchConstant.ES_HOST);
+            add(ElasticsearchConstant.ES_PORT);
+            add(ElasticsearchConstant.INDEX);
+        }
+    };
+
+    private String index;
+
+    private String elasticsearchHost;
+
+    private Integer elasticsearchPort;
+
+    /**
+     * key is indexName, value is field name
+     */
+    private Map<String, String> indexMap = new HashMap<>();
+
+    public String getIndex() {
+        return index;
+    }
+
+    public void setIndex(String index) {
+        this.index = index;
+    }
+
+    public String getElasticsearchHost() {
+        return elasticsearchHost;
+    }
+
+    public void setElasticsearchHost(String elasticsearchHost) {
+        this.elasticsearchHost = elasticsearchHost;
+    }
+
+    public Integer getElasticsearchPort() {
+        return elasticsearchPort;
+    }
+
+    public void setElasticsearchPort(Integer elasticsearchPort) {
+        this.elasticsearchPort = elasticsearchPort;
+    }
+
+    public Map<String, String> getIndexMap() {
+        return indexMap;
+    }
+
+    public void load(KeyValue props) {
+
+        properties2Object(props, this);
+    }
+
+    private void properties2Object(final KeyValue p, final Object object) {
+
+        Method[] methods = object.getClass().getMethods();
+        for (Method method : methods) {
+            String mn = method.getName();
+            if (mn.startsWith("set")) {
+                try {
+                    String tmp = mn.substring(4);
+                    String first = mn.substring(3, 4);
+
+                    String key = first.toLowerCase() + tmp;
+                    String property = p.getString(key);
+                    if (property != null) {
+                        Class<?>[] pt = method.getParameterTypes();
+                        if (pt != null && pt.length > 0) {
+                            String cn = pt[0].getSimpleName();
+                            Object arg;
+                            if (cn.equals("int") || cn.equals("Integer")) {
+                                arg = Integer.parseInt(property);
+                            } else if (cn.equals("long") || cn.equals("Long")) 
{
+                                arg = Long.parseLong(property);
+                            } else if (cn.equals("double") || 
cn.equals("Double")) {
+                                arg = Double.parseDouble(property);
+                            } else if (cn.equals("boolean") || 
cn.equals("Boolean")) {
+                                arg = Boolean.parseBoolean(property);
+                            } else if (cn.equals("float") || 
cn.equals("Float")) {
+                                arg = Float.parseFloat(property);
+                            } else if (cn.equals("String")) {
+                                arg = property;
+                            } else {
+                                continue;
+                            }
+                            method.invoke(object, arg);
+                        }
+                    }
+                } catch (Throwable ignored) {
+                }
+            }
+        }
+    }
+}
diff --git 
a/connectors/rocketmq-connect-elasticsearch/src/main/java/org/apache/rocketmq/connect/elasticsearch/config/ElasticsearchConstant.java
 
b/connectors/rocketmq-connect-elasticsearch/src/main/java/org/apache/rocketmq/connect/elasticsearch/config/ElasticsearchConstant.java
new file mode 100644
index 00000000..5a974a31
--- /dev/null
+++ 
b/connectors/rocketmq-connect-elasticsearch/src/main/java/org/apache/rocketmq/connect/elasticsearch/config/ElasticsearchConstant.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.elasticsearch.config;
+
+public class ElasticsearchConstant {
+
+    public static final String ES_DOC = "doc";
+
+    public static final String ES_PARTITION = "ES_PARTITION";
+
+    public static final String ES_POSITION = "ES_POSITION";
+
+    public static final String ES_HOST = "elasticsearchHost";
+
+    public static final String ES_PORT = "elasticsearchPort";
+
+    public static final String INDEX = "index";
+
+    public static final String INCREMENT_FIELD = "incrementField";
+
+    public static final String INCREMENT = "increment";
+
+    public static final String PRIMARY_SHARDS = "primaryShards";
+
+    public static final String PRIMARY_SHARD = "primaryShard";
+}
diff --git 
a/connectors/rocketmq-connect-elasticsearch/src/main/java/org/apache/rocketmq/connect/elasticsearch/connector/ElasticsearchSourceConnector.java
 
b/connectors/rocketmq-connect-elasticsearch/src/main/java/org/apache/rocketmq/connect/elasticsearch/connector/ElasticsearchSourceConnector.java
new file mode 100644
index 00000000..ad750d20
--- /dev/null
+++ 
b/connectors/rocketmq-connect-elasticsearch/src/main/java/org/apache/rocketmq/connect/elasticsearch/connector/ElasticsearchSourceConnector.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.elasticsearch.connector;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.rocketmq.connect.elasticsearch.config.ElasticsearchConfig;
+import org.apache.rocketmq.connect.elasticsearch.config.ElasticsearchConstant;
+
+public class ElasticsearchSourceConnector extends SourceConnector {
+
+    private KeyValue keyValue;
+
+    private ElasticsearchConfig config;
+
+    @Override
+    public List<KeyValue> taskConfigs(int maxTasks) {
+        this.config = new ElasticsearchConfig();
+        this.config.load(keyValue);
+        List<KeyValue> configs = new ArrayList<>();
+        JSONObject jsonObject = JSON.parseObject(this.config.getIndex());
+        for (Map.Entry<String, Object> entry : jsonObject.entrySet()) {
+            final String indexName = entry.getKey();
+            final JSONObject value = (JSONObject) entry.getValue();
+            Integer primaryShards = 
value.getInteger(ElasticsearchConstant.PRIMARY_SHARDS);
+            primaryShards = primaryShards > maxTasks ? maxTasks : 
primaryShards;
+            for (int i = 0; i < primaryShards; i++) {
+                this.keyValue.put(ElasticsearchConstant.INDEX, indexName);
+                this.keyValue.put(ElasticsearchConstant.INCREMENT_FIELD, 
value.keySet().stream()
+                    .filter(item -> 
!ElasticsearchConstant.PRIMARY_SHARDS.equals(item)).collect(Collectors.toList()).get(0));
+                final String id = 
value.getString(this.keyValue.getString(ElasticsearchConstant.INCREMENT_FIELD));
+                this.keyValue.put(ElasticsearchConstant.INCREMENT, id);
+                this.keyValue.put(ElasticsearchConstant.PRIMARY_SHARD, i + "");
+                configs.add(this.keyValue);
+            }
+        }
+        return configs;
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return ElasticsearchSourceTask.class;
+    }
+
+    @Override
+    public void start(KeyValue config) {
+
+        for (String requestKey : ElasticsearchConfig.REQUEST_CONFIG) {
+            if (!config.containsKey(requestKey)) {
+                throw new RuntimeException("Request config key: " + 
requestKey);
+            }
+        }
+        this.keyValue = config;
+
+    }
+
+    @Override
+    public void stop() {
+        this.keyValue = null;
+    }
+}
diff --git 
a/connectors/rocketmq-connect-elasticsearch/src/main/java/org/apache/rocketmq/connect/elasticsearch/connector/ElasticsearchSourceTask.java
 
b/connectors/rocketmq-connect-elasticsearch/src/main/java/org/apache/rocketmq/connect/elasticsearch/connector/ElasticsearchSourceTask.java
new file mode 100644
index 00000000..322e2e44
--- /dev/null
+++ 
b/connectors/rocketmq-connect-elasticsearch/src/main/java/org/apache/rocketmq/connect/elasticsearch/connector/ElasticsearchSourceTask.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.elasticsearch.connector;
+
+import com.alibaba.fastjson.JSON;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.connect.elasticsearch.config.ElasticsearchConfig;
+import org.apache.rocketmq.connect.elasticsearch.config.ElasticsearchConstant;
+import 
org.apache.rocketmq.connect.elasticsearch.replicator.source.ElasticsearchReplicator;
+import org.elasticsearch.search.SearchHit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticsearchSourceTask extends SourceTask {
+
+    private static final Logger log = 
LoggerFactory.getLogger(ElasticsearchSourceTask.class);
+
+    private ElasticsearchReplicator replicator;
+
+    private ElasticsearchConfig config;
+
+    @Override
+    public List<ConnectRecord> poll() {
+        List<ConnectRecord> res = new ArrayList<>();
+        try {
+            SearchHit searchHit = replicator.getQueue().poll(1000, 
TimeUnit.MILLISECONDS);
+            if (searchHit != null) {
+                res.add(searchHit2ConnectRecord(searchHit));
+            }
+        } catch (Exception e) {
+            log.error("elasticsearch sourceTask poll error, current config:" + 
JSON.toJSONString(config), e);
+        }
+        return res;
+    }
+
+    @Override
+    public void start(KeyValue keyValue) {
+        final RecordOffset recordOffset = 
this.sourceTaskContext.offsetStorageReader().readOffset(buildRecordPartition());
+        this.config = new ElasticsearchConfig();
+        this.config.load(keyValue);
+        this.replicator = new ElasticsearchReplicator(config);
+        this.replicator.start(recordOffset, keyValue);
+        log.info("elasticsearch source task start success");
+    }
+
+    @Override
+    public void stop() {
+        replicator.stop();
+        log.info("elasticsearch source task stop success");
+    }
+
+    public ConnectRecord searchHit2ConnectRecord(SearchHit hit) {
+        Schema schema = SchemaBuilder.struct().name(hit.getIndex()).build();
+        final List<Field> fields = buildFields(hit);
+        schema.setFields(fields);
+        final ConnectRecord connectRecord = new 
ConnectRecord(buildRecordPartition(),
+            buildRecordOffset(hit),
+            System.currentTimeMillis(),
+            schema,
+            hit.getSourceAsString());
+        return connectRecord;
+    }
+
+    private List<Field> buildFields(SearchHit hit) {
+        List<Field> fields = new ArrayList<>();
+        final Map<String, Object> map = hit.getSourceAsMap();
+        for (Map.Entry<String, Object> entry : map.entrySet()) {
+            fields.add(new Field(0, entry.getKey(), 
getSchema(entry.getValue())));
+        }
+        return fields;
+    }
+
+    private RecordPartition buildRecordPartition() {
+        Map<String, String> partitionMap = new HashMap<>();
+        partitionMap.put(ElasticsearchConstant.ES_PARTITION, 
ElasticsearchConstant.ES_PARTITION);
+        RecordPartition  recordPartition = new RecordPartition(partitionMap);
+        return recordPartition;
+    }
+
+    private RecordOffset buildRecordOffset(SearchHit hit) {
+        Map<String, Long> offsetMap = new HashMap<>();
+        Object value = 
JSON.parseObject(hit.getSourceAsString()).get(config.getIndexMap().get(hit.getIndex()));
+        if (value == null) {
+            value = 1;
+        }
+        offsetMap.put(hit.getIndex() + ":" + 
ElasticsearchConstant.ES_POSITION, Long.parseLong(value.toString()));
+        RecordOffset recordOffset = new RecordOffset(offsetMap);
+        return recordOffset;
+    }
+
+    private Schema getSchema(Object obj) {
+        if (obj instanceof Integer) {
+            return SchemaBuilder.int32().build();
+        } else if (obj instanceof Long) {
+            return SchemaBuilder.int64().build();
+        } else if (obj instanceof String) {
+            return SchemaBuilder.string().build();
+        } else if (obj instanceof Date) {
+            return SchemaBuilder.time().build();
+        } else if (obj instanceof Timestamp) {
+            return SchemaBuilder.timestamp().build();
+        }
+        return SchemaBuilder.string().build();
+    }
+
+}
diff --git 
a/connectors/rocketmq-connect-elasticsearch/src/main/java/org/apache/rocketmq/connect/elasticsearch/replicator/source/ElasticsearchQuery.java
 
b/connectors/rocketmq-connect-elasticsearch/src/main/java/org/apache/rocketmq/connect/elasticsearch/replicator/source/ElasticsearchQuery.java
new file mode 100644
index 00000000..718000a7
--- /dev/null
+++ 
b/connectors/rocketmq-connect-elasticsearch/src/main/java/org/apache/rocketmq/connect/elasticsearch/replicator/source/ElasticsearchQuery.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.elasticsearch.replicator.source;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.RecordOffset;
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.http.HttpHost;
+import org.apache.rocketmq.connect.elasticsearch.config.ElasticsearchConfig;
+import org.apache.rocketmq.connect.elasticsearch.config.ElasticsearchConstant;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.SearchScrollRequest;
+import org.elasticsearch.client.Node;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.query.RangeQueryBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticsearchQuery {
+
+    private Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    private ElasticsearchReplicator replicator;
+
+    private ElasticsearchConfig config;
+
+    private RestHighLevelClient client;
+
+    private RestClient restClient;
+
+    private ExecutorService executorService = Executors.newFixedThreadPool(5);
+
+    public ElasticsearchQuery(ElasticsearchReplicator replicator) {
+        this.replicator = replicator;
+        this.config = replicator.getConfig();
+        HttpHost httpHost = new HttpHost(config.getElasticsearchHost(), 
config.getElasticsearchPort());
+        Node node = new Node(httpHost);
+        RestClientBuilder restClientBuilder = RestClient.builder(node);
+        restClient = restClientBuilder.build();
+        client = new RestHighLevelClient(restClientBuilder);
+    }
+
+    public void start(RecordOffset recordOffset, KeyValue keyValue) {
+        String  scrollId = null;
+        while (true) {
+            if (scrollId == null) {
+                try {
+                    final SearchResponse searchResponse = 
this.searchData(recordOffset, keyValue);
+                    final SearchHit[] hits = 
searchResponse.getHits().getHits();
+                    if (hits == null || hits.length < 1) {
+                        break;
+                    }
+                    for (SearchHit hit : hits) {
+                        replicator.getQueue().add(hit);
+                    }
+                    scrollId = searchResponse.getScrollId();
+
+                } catch (Exception e) {
+                    logger.error("query Elasticsearch server failed", e);
+                    throw new RuntimeException(e);
+                }
+            }
+
+            try {
+                final SearchResponse searchResponse = 
scrollSearchData(scrollId);
+                final SearchHit[] hits = searchResponse.getHits().getHits();
+                if (hits == null || hits.length < 1) {
+                    break;
+                }
+                for (SearchHit hit : hits) {
+                    replicator.getQueue().add(hit);
+                }
+                scrollId = searchResponse.getScrollId();
+            } catch (Exception e) {
+                logger.error("scroll query Elasticsearch server occur error", 
e);
+                throw new RuntimeException(e);
+            }
+        }
+
+    }
+
+    private SearchResponse searchData(RecordOffset recordOffset, KeyValue 
keyValue) {
+        SearchRequest searchRequest = new SearchRequest();
+        searchRequest.indices(keyValue.getString(ElasticsearchConstant.INDEX));
+        final String shard = 
keyValue.getString(ElasticsearchConstant.PRIMARY_SHARD);
+        if (shard != null) {
+            searchRequest.preference("_shards:" + shard);
+        }
+        searchRequest.scroll(TimeValue.timeValueMinutes(1));
+        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+        RangeQueryBuilder rangeQueryBuilder = 
QueryBuilders.rangeQuery(keyValue.getString(ElasticsearchConstant.INCREMENT_FIELD))
+            .gte(keyValue.getString(ElasticsearchConstant.INCREMENT));
+        if (recordOffset != null && recordOffset.getOffset() != null && 
recordOffset.getOffset().size() > 0) {
+            final Long offsetValue = (Long) 
recordOffset.getOffset().get(keyValue.getString(ElasticsearchConstant.INDEX) + 
ElasticsearchConstant.ES_POSITION);
+            rangeQueryBuilder = rangeQueryBuilder.gte(offsetValue);
+        }
+        searchSourceBuilder.query(rangeQueryBuilder);
+        searchSourceBuilder.from(0).size(200);
+        searchRequest.source(searchSourceBuilder);
+        final SearchResponse searchResponse;
+        try {
+            searchResponse = client.search(searchRequest, 
RequestOptions.DEFAULT);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return searchResponse;
+    }
+
+    private SearchResponse scrollSearchData(String scrollId) {
+        try {
+            SearchScrollRequest searchScrollRequest = new 
SearchScrollRequest();
+            searchScrollRequest.scrollId(scrollId);
+            final SearchResponse searchResponse = 
client.scroll(searchScrollRequest, RequestOptions.DEFAULT);
+            return searchResponse;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void stop() {
+        try {
+            client.close();
+            executorService.shutdown();
+        } catch (IOException e) {
+            logger.error("close RestHighLevelClient occur error", e);
+        }
+    }
+
+}
diff --git 
a/connectors/rocketmq-connect-elasticsearch/src/main/java/org/apache/rocketmq/connect/elasticsearch/replicator/source/ElasticsearchReplicator.java
 
b/connectors/rocketmq-connect-elasticsearch/src/main/java/org/apache/rocketmq/connect/elasticsearch/replicator/source/ElasticsearchReplicator.java
new file mode 100644
index 00000000..d9083d67
--- /dev/null
+++ 
b/connectors/rocketmq-connect-elasticsearch/src/main/java/org/apache/rocketmq/connect/elasticsearch/replicator/source/ElasticsearchReplicator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.elasticsearch.replicator.source;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.RecordOffset;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.rocketmq.connect.elasticsearch.config.ElasticsearchConfig;
+import org.elasticsearch.search.SearchHit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticsearchReplicator {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ElasticsearchReplicator.class);
+
+    private ElasticsearchConfig config;
+
+    private ElasticsearchQuery query;
+
+    private BlockingQueue<SearchHit> queue = new LinkedBlockingQueue<>();
+
+    public ElasticsearchReplicator(ElasticsearchConfig config) {
+        this.config = config;
+    }
+
+    public void start(RecordOffset recordOffset, KeyValue keyValue) {
+        query = new ElasticsearchQuery(this);
+        query.start(recordOffset, keyValue);
+        LOGGER.info("ElasticsearchReplicator start succeed");
+    }
+
+    public void stop() {
+        query.stop();
+    }
+
+    public ElasticsearchConfig getConfig() {
+        return this.config;
+    }
+
+    public void commit(SearchHit data) {
+        queue.add(data);
+    }
+
+    public BlockingQueue<SearchHit> getQueue() {
+        return this.queue;
+    }
+}
diff --git 
a/connectors/rocketmq-connect-elasticsearch/src/test/java/org/apache/rocketmq/connect/elasticsearch/config/ElasticsearchConfigTest.java
 
b/connectors/rocketmq-connect-elasticsearch/src/test/java/org/apache/rocketmq/connect/elasticsearch/config/ElasticsearchConfigTest.java
new file mode 100644
index 00000000..65df94bc
--- /dev/null
+++ 
b/connectors/rocketmq-connect-elasticsearch/src/test/java/org/apache/rocketmq/connect/elasticsearch/config/ElasticsearchConfigTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.elasticsearch.config;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ElasticsearchConfigTest {
+
+    private ElasticsearchConfig config;
+
+    @Before
+    public void before() {
+        config = new ElasticsearchConfig();
+    }
+
+    @Test
+    public void loadTest() {
+        KeyValue keyValue = new DefaultKeyValue();
+        keyValue.put(ElasticsearchConstant.ES_HOST, "localhost");
+        keyValue.put(ElasticsearchConstant.ES_PORT, 9200);
+        keyValue.put(ElasticsearchConstant.INDEX, "index");
+        config.load(keyValue);
+        Assert.assertEquals("localhost", config.getElasticsearchHost());
+        Assert.assertTrue(9200 == config.getElasticsearchPort());
+        Assert.assertEquals("index", config.getIndex());
+
+    }
+}
diff --git 
a/connectors/rocketmq-connect-elasticsearch/src/test/java/org/apache/rocketmq/connect/elasticsearch/connector/ElasticsearchSourceConnectorTest.java
 
b/connectors/rocketmq-connect-elasticsearch/src/test/java/org/apache/rocketmq/connect/elasticsearch/connector/ElasticsearchSourceConnectorTest.java
new file mode 100644
index 00000000..fcb45bd2
--- /dev/null
+++ 
b/connectors/rocketmq-connect-elasticsearch/src/test/java/org/apache/rocketmq/connect/elasticsearch/connector/ElasticsearchSourceConnectorTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.elasticsearch.connector;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.connect.elasticsearch.config.ElasticsearchConstant;
+import org.assertj.core.api.Assertions;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ElasticsearchSourceConnectorTest {
+
+    private ElasticsearchSourceConnector sourceConnector;
+
+    @Before
+    public void before() {
+        sourceConnector = new ElasticsearchSourceConnector();
+    }
+
+    @Test
+    public void startTest() {
+        KeyValue keyValue = new DefaultKeyValue();
+        keyValue.put(ElasticsearchConstant.ES_HOST, "localhost");
+        keyValue.put(ElasticsearchConstant.ES_PORT, 9200);
+        keyValue.put(ElasticsearchConstant.INDEX, "index");
+        Assertions.assertThatCode(() -> 
sourceConnector.start(keyValue)).doesNotThrowAnyException();
+    }
+}
diff --git 
a/connectors/rocketmq-connect-elasticsearch/src/test/java/org/apache/rocketmq/connect/elasticsearch/replicator/source/ElasticsearchQueryTest.java
 
b/connectors/rocketmq-connect-elasticsearch/src/test/java/org/apache/rocketmq/connect/elasticsearch/replicator/source/ElasticsearchQueryTest.java
new file mode 100644
index 00000000..a44fac0d
--- /dev/null
+++ 
b/connectors/rocketmq-connect-elasticsearch/src/test/java/org/apache/rocketmq/connect/elasticsearch/replicator/source/ElasticsearchQueryTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.elasticsearch.replicator.source;
+
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.connect.elasticsearch.config.ElasticsearchConfig;
+import org.assertj.core.api.Assertions;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ElasticsearchQueryTest {
+
+    private ElasticsearchQuery query;
+
+    private ElasticsearchReplicator replicator;
+
+    private ElasticsearchConfig config;
+
+    @Before
+    public void before() {
+        config = new ElasticsearchConfig();
+        config.setElasticsearchHost("localhost");
+        config.setElasticsearchPort(9200);
+        
config.setIndex("{\"index_connect\":{\"id\":1},\"index_connect2\":{\"id\":2}}");
+        replicator = new ElasticsearchReplicator(config);
+        query = new ElasticsearchQuery(replicator);
+    }
+
+    @Test
+    public void startTest() {
+        Assertions.assertThatCode(() -> query.start(null, new 
DefaultKeyValue())).doesNotThrowAnyException();
+    }
+}


Reply via email to