This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c8590716ae [Feature][Connector-V2] Support Qdrant sink and source 
connector (#7299)
c8590716ae is described below

commit c8590716aecfaabc449c462a17c58a64e3b298bf
Author: Anush <anushshett...@gmail.com>
AuthorDate: Fri Aug 30 22:42:27 2024 +0530

    [Feature][Connector-V2] Support Qdrant sink and source connector (#7299)
---
 .github/workflows/labeler/label-scope-conf.yml     |   9 +-
 config/plugin_config                               |   1 +
 docs/en/connector-v2/sink/Qdrant.md                |  70 ++++++++
 docs/en/connector-v2/source/Qdrant.md              |  81 +++++++++
 docs/zh/connector-v2/sink/Qdrant.md                |  68 ++++++++
 docs/zh/connector-v2/source/Qdrant.md              |  79 +++++++++
 plugin-mapping.properties                          |   2 +
 seatunnel-connectors-v2/connector-qdrant/pom.xml   |  63 +++++++
 .../seatunnel/qdrant/config/QdrantConfig.java      |  50 ++++++
 .../seatunnel/qdrant/config/QdrantParameters.java  |  47 +++++
 .../qdrant/exception/QdrantConnectorException.java |  36 ++++
 .../seatunnel/qdrant/sink/QdrantBatchWriter.java   | 190 +++++++++++++++++++++
 .../seatunnel/qdrant/sink/QdrantSink.java          |  50 ++++++
 .../seatunnel/qdrant/sink/QdrantSinkFactory.java   |  55 ++++++
 .../seatunnel/qdrant/sink/QdrantSinkWriter.java    |  60 +++++++
 .../seatunnel/qdrant/source/QdrantSource.java      |  63 +++++++
 .../qdrant/source/QdrantSourceFactory.java         |  63 +++++++
 .../qdrant/source/QdrantSourceReader.java          | 181 ++++++++++++++++++++
 seatunnel-connectors-v2/pom.xml                    |   1 +
 seatunnel-dist/pom.xml                             |   7 +
 .../connector-qdrant-e2e/pom.xml                   |  68 ++++++++
 .../e2e/connector/v2/qdrant/QdrantIT.java          | 145 ++++++++++++++++
 .../src/test/resources/qdrant-to-qdrant.conf       |  51 ++++++
 seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml   |   1 +
 24 files changed, 1440 insertions(+), 1 deletion(-)

diff --git a/.github/workflows/labeler/label-scope-conf.yml 
b/.github/workflows/labeler/label-scope-conf.yml
index a506268d39..b417d53e72 100644
--- a/.github/workflows/labeler/label-scope-conf.yml
+++ b/.github/workflows/labeler/label-scope-conf.yml
@@ -257,6 +257,13 @@ activemq:
       - changed-files:
           - any-glob-to-any-file: seatunnel-connectors-v2/connector-activemq/**
           - all-globs-to-all-files: 
'!seatunnel-connectors-v2/connector-!(activemq)/**'
+
+qdrant:
+  - all:
+      - changed-files:
+          - any-glob-to-any-file: seatunnel-connectors-v2/connector-qdrant/**
+          - all-globs-to-all-files: 
'!seatunnel-connectors-v2/connector-!(qdrant)/**'
+
 typesense:
   - all:
       - changed-files:
@@ -285,4 +292,4 @@ sls:
   - all:
       - changed-files:
           - any-glob-to-any-file: seatunnel-connectors-v2/connector-sls/**
-          - all-globs-to-all-files: 
'!seatunnel-connectors-v2/connector-!(sls)/**'
\ No newline at end of file
+          - all-globs-to-all-files: 
'!seatunnel-connectors-v2/connector-!(sls)/**'
diff --git a/config/plugin_config b/config/plugin_config
index e0dd233300..26eb4cab4a 100644
--- a/config/plugin_config
+++ b/config/plugin_config
@@ -88,5 +88,6 @@ connector-web3j
 connector-milvus
 connector-activemq
 connector-sls
+connector-qdrant
 connector-typesense
 connector-cdc-opengauss
diff --git a/docs/en/connector-v2/sink/Qdrant.md 
b/docs/en/connector-v2/sink/Qdrant.md
new file mode 100644
index 0000000000..e94598d291
--- /dev/null
+++ b/docs/en/connector-v2/sink/Qdrant.md
@@ -0,0 +1,70 @@
+# Qdrant
+
+> Qdrant Sink Connector
+
+## Description
+
+[Qdrant](https://qdrant.tech/) is a high-performance vector search engine and 
vector database.
+
+This connector can be used to write data into a Qdrant collection.
+
+## Data Type Mapping
+
+| SeaTunnel Data Type | Qdrant Data Type |
+|---------------------|------------------|
+| TINYINT             | INTEGER          |
+| SMALLINT            | INTEGER          |
+| INT                 | INTEGER          |
+| BIGINT              | INTEGER          |
+| FLOAT               | DOUBLE           |
+| DOUBLE              | DOUBLE           |
+| BOOLEAN             | BOOL             |
+| STRING              | STRING           |
+| ARRAY               | LIST             |
+| FLOAT_VECTOR        | DENSE_VECTOR     |
+| BINARY_VECTOR       | DENSE_VECTOR     |
+| FLOAT16_VECTOR      | DENSE_VECTOR     |
+| BFLOAT16_VECTOR     | DENSE_VECTOR     |
+| SPARSE_FLOAT_VECTOR | SPARSE_VECTOR    |
+
+The value of the primary key column will be used as point ID in Qdrant. If no 
primary key is present, a random UUID will be used.
+
+## Options
+
+|      name       |  type  | required | default value |
+|-----------------|--------|----------|---------------|
+| collection_name | string | yes      | -             |
+| batch_size      | int    | no       | 64            |
+| host            | string | no       | localhost     |
+| port            | int    | no       | 6334          |
+| api_key         | string | no       | -             |
+| use_tls         | int    | no       | false         |
+| common-options  |        | no       | -             |
+
+### collection_name [string]
+
+The name of the Qdrant collection to read data from.
+
+### batch_size [int]
+
+The batch size of each upsert request to Qdrant.
+
+### host [string]
+
+The host name of the Qdrant instance. Defaults to "localhost".
+
+### port [int]
+
+The gRPC port of the Qdrant instance.
+
+### api_key [string]
+
+The API key to use for authentication if set.
+
+### use_tls [bool]
+
+Whether to use TLS(SSL) connection. Required if using Qdrant cloud(https).
+
+### common options
+
+Sink plugin common parameters, please refer to [Source Common 
Options](../sink-common-options.md) for details.
diff --git a/docs/en/connector-v2/source/Qdrant.md 
b/docs/en/connector-v2/source/Qdrant.md
new file mode 100644
index 0000000000..c523cf1da6
--- /dev/null
+++ b/docs/en/connector-v2/source/Qdrant.md
@@ -0,0 +1,81 @@
+# Qdrant
+
+> Qdrant source connector
+
+## Description
+
+[Qdrant](https://qdrant.tech/) is a high-performance vector search engine and 
vector database.
+
+This connector can be used to read data from a Qdrant collection.
+
+## Options
+
+|      name       |  type  | required | default value |
+|-----------------|--------|----------|---------------|
+| collection_name | string | yes      | -             |
+| schema          | config | yes      | -             |
+| host            | string | no       | localhost     |
+| port            | int    | no       | 6334          |
+| api_key         | string | no       | -             |
+| use_tls         | int    | no       | false         |
+| common-options  |        | no       | -             |
+
+### collection_name [string]
+
+The name of the Qdrant collection to read data from.
+
+### schema [config]
+
+The schema of the table to read data into.
+
+Eg:
+
+```hocon
+schema = {
+  fields {
+    age = int
+    address = string
+    some_vector = float_vector
+  }
+}
+```
+
+Each entry in Qdrant is called a point.
+
+The `float_vector` type columns are read from the vectors of each point, 
others are read from the JSON payload associated with the point.
+
+If a column is marked as primary key, the ID of the Qdrant point is written 
into it. It can be of type `"string"` or `"int"`. Since Qdrant only 
[allows](https://qdrant.tech/documentation/concepts/points/#point-ids) positive 
integers and UUIDs as point IDs.
+
+If the collection was created with a single default/unnamed vector, use 
`default_vector` as the vector name.
+
+```hocon
+schema = {
+  fields {
+    age = int
+    address = string
+    default_vector = float_vector
+  }
+}
+```
+
+The ID of the point in Qdrant will be written into the column which is marked 
as the primary key. It can be of type `int` or `string`.
+
+### host [string]
+
+The host name of the Qdrant instance. Defaults to "localhost".
+
+### port [int]
+
+The gRPC port of the Qdrant instance.
+
+### api_key [string]
+
+The API key to use for authentication if set.
+
+### use_tls [bool]
+
+Whether to use TLS(SSL) connection. Required if using Qdrant cloud(https).
+
+### common options
+
+Source plugin common parameters, please refer to [Source Common 
Options](../source-common-options.md) for details.
diff --git a/docs/zh/connector-v2/sink/Qdrant.md 
b/docs/zh/connector-v2/sink/Qdrant.md
new file mode 100644
index 0000000000..7394eb8541
--- /dev/null
+++ b/docs/zh/connector-v2/sink/Qdrant.md
@@ -0,0 +1,68 @@
+# Qdrant
+
+> Qdrant 数据连接器
+
+[Qdrant](https://qdrant.tech/) 是一个高性能的向量搜索引擎和向量数据库。
+
+该连接器可用于将数据写入 Qdrant 集合。
+
+## 数据类型映射
+
+|   SeaTunnel 数据类型    |  Qdrant 数据类型  |
+|---------------------|---------------|
+| TINYINT             | INTEGER       |
+| SMALLINT            | INTEGER       |
+| INT                 | INTEGER       |
+| BIGINT              | INTEGER       |
+| FLOAT               | DOUBLE        |
+| DOUBLE              | DOUBLE        |
+| BOOLEAN             | BOOL          |
+| STRING              | STRING        |
+| ARRAY               | LIST          |
+| FLOAT_VECTOR        | DENSE_VECTOR  |
+| BINARY_VECTOR       | DENSE_VECTOR  |
+| FLOAT16_VECTOR      | DENSE_VECTOR  |
+| BFLOAT16_VECTOR     | DENSE_VECTOR  |
+| SPARSE_FLOAT_VECTOR | SPARSE_VECTOR |
+
+主键列的值将用作 Qdrant 中的点 ID。如果没有主键,则将使用随机 UUID。
+
+## 选项
+
+|       名称        |   类型   | 必填 |    默认值    |
+|-----------------|--------|----|-----------|
+| collection_name | string | 是  | -         |
+| batch_size      | int    | 否  | 64        |
+| host            | string | 否  | localhost |
+| port            | int    | 否  | 6334      |
+| api_key         | string | 否  | -         |
+| use_tls         | bool   | 否  | false     |
+| common-options  |        | 否  | -         |
+
+### collection_name [string]
+
+要从中读取数据的 Qdrant 集合的名称。
+
+### batch_size [int]
+
+每个 upsert 请求到 Qdrant 的批量大小。
+
+### host [string]
+
+Qdrant 实例的主机名。默认为 "localhost"。
+
+### port [int]
+
+Qdrant 实例的 gRPC 端口。
+
+### api_key [string]
+
+用于身份验证的 API 密钥(如果设置)。
+
+### use_tls [bool]
+
+是否使用 TLS(SSL)连接。如果使用 Qdrant 云(https),则需要。
+
+### 通用选项
+
+接收插件的通用参数,请参考[源通用选项](../sink-common-options.md)了解详情。
diff --git a/docs/zh/connector-v2/source/Qdrant.md 
b/docs/zh/connector-v2/source/Qdrant.md
new file mode 100644
index 0000000000..140ff36a39
--- /dev/null
+++ b/docs/zh/connector-v2/source/Qdrant.md
@@ -0,0 +1,79 @@
+# Qdrant
+
+> Qdrant 数据源连接器
+
+[Qdrant](https://qdrant.tech/) 是一个高性能的向量搜索引擎和向量数据库。
+
+该连接器可用于从 Qdrant 集合中读取数据。
+
+## 选项
+
+|       名称        |   类型   | 必填 |    默认值    |
+|-----------------|--------|----|-----------|
+| collection_name | string | 是  | -         |
+| schema          | config | 是  | -         |
+| host            | string | 否  | localhost |
+| port            | int    | 否  | 6334      |
+| api_key         | string | 否  | -         |
+| use_tls         | bool   | 否  | false     |
+| common-options  |        | 否  | -         |
+
+### collection_name [string]
+
+要从中读取数据的 Qdrant 集合的名称。
+
+### schema [config]
+
+要将数据读取到的表的模式。
+
+例如:
+
+```hocon
+schema = {
+  fields {
+    age = int
+    address = string
+    some_vector = float_vector
+  }
+}
+```
+
+Qdrant 中的每个条目称为一个点。
+
+`float_vector` 类型的列从每个点的向量中读取,其他列从与该点关联的 JSON 有效负载中读取。
+
+如果列被标记为主键,Qdrant 点的 ID 将写入其中。它可以是 `"string"` 或 `"int"` 类型。因为 Qdrant 
仅[允许](https://qdrant.tech/documentation/concepts/points/#point-ids)使用正整数和 UUID 
作为点 ID。
+
+如果集合是用单个默认/未命名向量创建的,请使用 `default_vector` 作为向量名称。
+
+```hocon
+schema = {
+  fields {
+    age = int
+    address = string
+    default_vector = float_vector
+  }
+}
+```
+
+Qdrant 中点的 ID 将写入标记为主键的列中。它可以是 `int` 或 `string` 类型。
+
+### host [string]
+
+Qdrant 实例的主机名。默认为 "localhost"。
+
+### port [int]
+
+Qdrant 实例的 gRPC 端口。
+
+### api_key [string]
+
+用于身份验证的 API 密钥(如果设置)。
+
+### use_tls [bool]
+
+是否使用 TLS(SSL)连接。如果使用 Qdrant 云(https),则需要。
+
+### 通用选项
+
+源插件的通用参数,请参考[源通用选项](../source-common-options.md)了解详情。****
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 06a01ec04b..1ddedd5ea8 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -131,6 +131,8 @@ seatunnel.sink.ObsFile = connector-file-obs
 seatunnel.source.Milvus = connector-milvus
 seatunnel.sink.Milvus = connector-milvus
 seatunnel.sink.ActiveMQ = connector-activemq
+seatunnel.source.Qdrant = connector-qdrant
+seatunnel.sink.Qdrant = connector-qdrant
 seatunnel.source.Sls = connector-sls
 seatunnel.source.Typesense = connector-typesense
 seatunnel.sink.Typesense = connector-typesense
diff --git a/seatunnel-connectors-v2/connector-qdrant/pom.xml 
b/seatunnel-connectors-v2/connector-qdrant/pom.xml
new file mode 100644
index 0000000000..686f0bdb7a
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-qdrant/pom.xml
@@ -0,0 +1,63 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-connectors-v2</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>connector-qdrant</artifactId>
+    <name>SeaTunnel : Connectors V2 : Qdrant</name>
+
+    <properties>
+        <connector.name>connector.qdrant</connector.name>
+    </properties>
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-protobuf</artifactId>
+            <version>1.65.1</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-guava</artifactId>
+            <version>${project.version}</version>
+            <classifier>optional</classifier>
+        </dependency>
+
+        <dependency>
+            <groupId>io.qdrant</groupId>
+            <artifactId>client</artifactId>
+            <version>1.11.0</version>
+        </dependency>
+
+    </dependencies>
+</project>
diff --git 
a/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/config/QdrantConfig.java
 
b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/config/QdrantConfig.java
new file mode 100644
index 0000000000..1be03a1311
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/config/QdrantConfig.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.qdrant.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class QdrantConfig {
+
+    public static final String CONNECTOR_IDENTITY = "Qdrant";
+
+    public static final Option<String> HOST =
+            Options.key("host")
+                    .stringType()
+                    .defaultValue("localhost")
+                    .withDescription("Qdrant gRPC host");
+
+    public static final Option<Integer> PORT =
+            
Options.key("port").intType().defaultValue(6334).withDescription("Qdrant gRPC 
port");
+
+    public static final Option<String> API_KEY =
+            
Options.key("api_key").stringType().defaultValue("").withDescription("Qdrant 
API key");
+
+    public static final Option<String> COLLECTION_NAME =
+            Options.key("collection_name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Qdrant collection name");
+
+    public static final Option<Boolean> USE_TLS =
+            Options.key("use_tls")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Whether to use TLS");
+}
diff --git 
a/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/config/QdrantParameters.java
 
b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/config/QdrantParameters.java
new file mode 100644
index 0000000000..1ae612fafb
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/config/QdrantParameters.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.qdrant.config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import io.qdrant.client.QdrantClient;
+import io.qdrant.client.QdrantGrpcClient;
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+public class QdrantParameters implements Serializable {
+    private String host;
+    private int port;
+    private String apiKey;
+    private String collectionName;
+    private boolean useTls;
+
+    public QdrantParameters(ReadonlyConfig config) {
+        this.host = config.get(QdrantConfig.HOST);
+        this.port = config.get(QdrantConfig.PORT);
+        this.apiKey = config.get(QdrantConfig.API_KEY);
+        this.collectionName = config.get(QdrantConfig.COLLECTION_NAME);
+        this.useTls = config.get(QdrantConfig.USE_TLS);
+    }
+
+    public QdrantClient buildQdrantClient() {
+        return new QdrantClient(QdrantGrpcClient.newBuilder(host, port, 
useTls).build());
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/exception/QdrantConnectorException.java
 
b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/exception/QdrantConnectorException.java
new file mode 100644
index 0000000000..becf31abf5
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/exception/QdrantConnectorException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.qdrant.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class QdrantConnectorException extends SeaTunnelRuntimeException {
+    public QdrantConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
String errorMessage) {
+        super(seaTunnelErrorCode, errorMessage);
+    }
+
+    public QdrantConnectorException(
+            SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, 
Throwable cause) {
+        super(seaTunnelErrorCode, errorMessage, cause);
+    }
+
+    public QdrantConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
Throwable cause) {
+        super(seaTunnelErrorCode, cause);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/sink/QdrantBatchWriter.java
 
b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/sink/QdrantBatchWriter.java
new file mode 100644
index 0000000000..7ca4428c81
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/sink/QdrantBatchWriter.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.qdrant.sink;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SqlType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.utils.BufferUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.qdrant.config.QdrantParameters;
+import 
org.apache.seatunnel.connectors.seatunnel.qdrant.exception.QdrantConnectorException;
+
+import org.apache.commons.collections4.CollectionUtils;
+
+import io.qdrant.client.QdrantClient;
+import io.qdrant.client.ValueFactory;
+import io.qdrant.client.VectorFactory;
+import io.qdrant.client.grpc.JsonWithInt;
+import io.qdrant.client.grpc.Points;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static io.qdrant.client.PointIdFactory.id;
+import static 
org.apache.seatunnel.api.table.catalog.PrimaryKey.isPrimaryKeyField;
+
+public class QdrantBatchWriter {
+
+    private final int batchSize;
+    private final CatalogTable catalogTable;
+    private final String collectionName;
+    private final QdrantClient qdrantClient;
+
+    private final List<Points.PointStruct> qdrantDataCache;
+    private volatile int writeCount = 0;
+
+    public QdrantBatchWriter(
+            CatalogTable catalogTable, Integer batchSize, QdrantParameters 
params) {
+        this.catalogTable = catalogTable;
+        this.qdrantClient = params.buildQdrantClient();
+        this.collectionName = params.getCollectionName();
+        this.batchSize = batchSize;
+        this.qdrantDataCache = new ArrayList<>(batchSize);
+    }
+
+    public void addToBatch(SeaTunnelRow element) {
+        Points.PointStruct point = buildPoint(element);
+        qdrantDataCache.add(point);
+        writeCount++;
+    }
+
+    public boolean needFlush() {
+        return this.writeCount >= this.batchSize;
+    }
+
+    public synchronized void flush() {
+        if (CollectionUtils.isEmpty(this.qdrantDataCache)) {
+            return;
+        }
+        upsert();
+        this.qdrantDataCache.clear();
+        this.writeCount = 0;
+    }
+
+    public void close() {
+        this.qdrantClient.close();
+    }
+
+    private Points.PointStruct buildPoint(SeaTunnelRow element) {
+        SeaTunnelRowType seaTunnelRowType = catalogTable.getSeaTunnelRowType();
+        PrimaryKey primaryKey = catalogTable.getTableSchema().getPrimaryKey();
+
+        Points.PointStruct.Builder point = Points.PointStruct.newBuilder();
+        Points.NamedVectors.Builder namedVectors = 
Points.NamedVectors.newBuilder();
+        for (int i = 0; i < seaTunnelRowType.getFieldNames().length; i++) {
+            String fieldName = seaTunnelRowType.getFieldNames()[i];
+            SeaTunnelDataType<?> fieldType = seaTunnelRowType.getFieldType(i);
+            Object value = element.getField(i);
+
+            if (isPrimaryKeyField(primaryKey, fieldName)) {
+                point.setId(pointId(fieldType, value));
+                continue;
+            }
+
+            JsonWithInt.Value payloadValue = buildPayload(fieldType, value);
+            if (payloadValue != null) {
+                point.putPayload(fieldName, payloadValue);
+                continue;
+            }
+
+            Points.Vector vector = buildVector(fieldType, value);
+            if (vector != null) {
+                namedVectors.putVectors(fieldName, vector);
+            }
+        }
+
+        if (!point.hasId()) {
+            point.setId(id(UUID.randomUUID()));
+        }
+
+        
point.setVectors(Points.Vectors.newBuilder().setVectors(namedVectors).build());
+        return point.build();
+    }
+
+    private void upsert() {
+        try {
+            qdrantClient
+                    .upsertAsync(
+                            Points.UpsertPoints.newBuilder()
+                                    .setCollectionName(collectionName)
+                                    .addAllPoints(qdrantDataCache)
+                                    .build())
+                    .get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException("Upsert failed", e);
+        }
+    }
+
+    public static Points.PointId pointId(SeaTunnelDataType<?> fieldType, 
Object value) {
+        SqlType sqlType = fieldType.getSqlType();
+        switch (sqlType) {
+            case INT:
+                return id(Integer.parseInt(value.toString()));
+            case STRING:
+                return id(UUID.fromString(value.toString()));
+            default:
+                throw new QdrantConnectorException(
+                        CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+                        "Unexpected value type for point ID: " + 
sqlType.name());
+        }
+    }
+
+    public static JsonWithInt.Value buildPayload(SeaTunnelDataType<?> 
fieldType, Object value) {
+        SqlType sqlType = fieldType.getSqlType();
+        switch (sqlType) {
+            case SMALLINT:
+            case INT:
+            case BIGINT:
+                return ValueFactory.value(Integer.parseInt(value.toString()));
+            case FLOAT:
+            case DOUBLE:
+                return ValueFactory.value(Long.parseLong(value.toString()));
+            case STRING:
+            case DATE:
+                return ValueFactory.value(value.toString());
+            case BOOLEAN:
+                return 
ValueFactory.value(Boolean.parseBoolean(value.toString()));
+            default:
+                return null;
+        }
+    }
+
+    public static Points.Vector buildVector(SeaTunnelDataType<?> fieldType, 
Object value) {
+        SqlType sqlType = fieldType.getSqlType();
+        switch (sqlType) {
+            case FLOAT_VECTOR:
+            case FLOAT16_VECTOR:
+            case BFLOAT16_VECTOR:
+            case BINARY_VECTOR:
+                ByteBuffer floatVectorBuffer = (ByteBuffer) value;
+                Float[] floats = BufferUtils.toFloatArray(floatVectorBuffer);
+                return 
VectorFactory.vector(Arrays.stream(floats).collect(Collectors.toList()));
+            default:
+                return null;
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/sink/QdrantSink.java
 
b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/sink/QdrantSink.java
new file mode 100644
index 0000000000..85119032c8
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/sink/QdrantSink.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.qdrant.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import org.apache.seatunnel.connectors.seatunnel.qdrant.config.QdrantConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.qdrant.config.QdrantParameters;
+
+import java.io.IOException;
+
+public class QdrantSink extends AbstractSimpleSink<SeaTunnelRow, Void>
+        implements SupportMultiTableSink {
+    private final QdrantParameters qdrantParameters;
+    private final CatalogTable catalogTable;
+
+    public QdrantSink(ReadonlyConfig config, CatalogTable table) {
+        this.qdrantParameters = new QdrantParameters(config);
+        this.catalogTable = table;
+    }
+
+    @Override
+    public String getPluginName() {
+        return QdrantConfig.CONNECTOR_IDENTITY;
+    }
+
+    @Override
+    public QdrantSinkWriter createWriter(SinkWriter.Context context) throws 
IOException {
+        return new QdrantSinkWriter(catalogTable, qdrantParameters);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/sink/QdrantSinkFactory.java
 
b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/sink/QdrantSinkFactory.java
new file mode 100644
index 0000000000..a7ed5599d5
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/sink/QdrantSinkFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.qdrant.sink;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SinkCommonOptions;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableSink;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import org.apache.seatunnel.connectors.seatunnel.qdrant.config.QdrantConfig;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class QdrantSinkFactory implements TableSinkFactory {
+    @Override
+    public String factoryIdentifier() {
+        return QdrantConfig.CONNECTOR_IDENTITY;
+    }
+
+    @Override
+    public TableSink createSink(TableSinkFactoryContext context) {
+        CatalogTable catalogTable = context.getCatalogTable();
+        return () -> new QdrantSink(context.getOptions(), catalogTable);
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+                .optional(
+                        QdrantConfig.HOST,
+                        QdrantConfig.PORT,
+                        QdrantConfig.API_KEY,
+                        QdrantConfig.USE_TLS,
+                        SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
+                .build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/sink/QdrantSinkWriter.java
 
b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/sink/QdrantSinkWriter.java
new file mode 100644
index 0000000000..a0e00838b6
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/sink/QdrantSinkWriter.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.qdrant.sink;
+
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import 
org.apache.seatunnel.connectors.seatunnel.qdrant.config.QdrantParameters;
+
+import java.io.IOException;
+import java.util.Optional;
+
+public class QdrantSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
+        implements SupportMultiTableSinkWriter<Void> {
+
+    private final QdrantBatchWriter batchWriter;
+
+    public QdrantSinkWriter(CatalogTable catalog, QdrantParameters 
qdrantParameters) {
+        int batchSize = 64;
+        this.batchWriter = new QdrantBatchWriter(catalog, batchSize, 
qdrantParameters);
+    }
+
+    @Override
+    public void write(SeaTunnelRow element) throws IOException {
+        batchWriter.addToBatch(element);
+        if (batchWriter.needFlush()) {
+            batchWriter.flush();
+        }
+    }
+
+    @Override
+    public Optional<Void> prepareCommit() {
+        batchWriter.flush();
+        return Optional.empty();
+    }
+
+    private void clearBuffer() {}
+
+    @Override
+    public void close() throws IOException {
+        batchWriter.flush();
+        batchWriter.close();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/source/QdrantSource.java
 
b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/source/QdrantSource.java
new file mode 100644
index 0000000000..39aeb3a879
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/source/QdrantSource.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.qdrant.source;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+import org.apache.seatunnel.connectors.seatunnel.qdrant.config.QdrantConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.qdrant.config.QdrantParameters;
+
+import java.util.Collections;
+import java.util.List;
+
+public class QdrantSource extends AbstractSingleSplitSource<SeaTunnelRow> {
+    private final QdrantParameters qdrantParameters;
+    private final CatalogTable catalogTable;
+
+    @Override
+    public String getPluginName() {
+        return QdrantConfig.CONNECTOR_IDENTITY;
+    }
+
+    public QdrantSource(ReadonlyConfig readonlyConfig) {
+        this.qdrantParameters = new QdrantParameters(readonlyConfig);
+        this.catalogTable = CatalogTableUtil.buildWithConfig(readonlyConfig);
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.BOUNDED;
+    }
+
+    @Override
+    public List<CatalogTable> getProducedCatalogTables() {
+        return Collections.singletonList(catalogTable);
+    }
+
+    @Override
+    public AbstractSingleSplitReader<SeaTunnelRow> createReader(
+            SingleSplitReaderContext readerContext) {
+        return new QdrantSourceReader(qdrantParameters, readerContext, 
catalogTable);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/source/QdrantSourceFactory.java
 
b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/source/QdrantSourceFactory.java
new file mode 100644
index 0000000000..0639fc07e2
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/source/QdrantSourceFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.qdrant.source;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
+import org.apache.seatunnel.api.table.connector.TableSource;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import org.apache.seatunnel.connectors.seatunnel.qdrant.config.QdrantConfig;
+
+import com.google.auto.service.AutoService;
+
+import java.io.Serializable;
+
+@AutoService(Factory.class)
+public class QdrantSourceFactory implements TableSourceFactory {
+    @Override
+    public String factoryIdentifier() {
+        return QdrantConfig.CONNECTOR_IDENTITY;
+    }
+
+    @Override
+    public <T, SplitT extends SourceSplit, StateT extends Serializable>
+            TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
+        return () -> (SeaTunnelSource<T, SplitT, StateT>) new 
QdrantSource(context.getOptions());
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+                .required(QdrantConfig.COLLECTION_NAME, 
TableSchemaOptions.SCHEMA)
+                .optional(
+                        QdrantConfig.HOST,
+                        QdrantConfig.PORT,
+                        QdrantConfig.API_KEY,
+                        QdrantConfig.USE_TLS)
+                .build();
+    }
+
+    @Override
+    public Class<? extends SeaTunnelSource> getSourceClass() {
+        return QdrantSource.class;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/source/QdrantSourceReader.java
 
b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/source/QdrantSourceReader.java
new file mode 100644
index 0000000000..2c37163129
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/source/QdrantSourceReader.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.qdrant.source;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.utils.BufferUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+import 
org.apache.seatunnel.connectors.seatunnel.qdrant.config.QdrantParameters;
+import 
org.apache.seatunnel.connectors.seatunnel.qdrant.exception.QdrantConnectorException;
+
+import io.qdrant.client.QdrantClient;
+import io.qdrant.client.WithVectorsSelectorFactory;
+import io.qdrant.client.grpc.JsonWithInt;
+import io.qdrant.client.grpc.Points;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static io.qdrant.client.WithPayloadSelectorFactory.enable;
+import static 
org.apache.seatunnel.api.table.catalog.PrimaryKey.isPrimaryKeyField;
+
+public class QdrantSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRow> {
+    private final QdrantParameters qdrantParameters;
+    private final SingleSplitReaderContext context;
+    private final TableSchema tableSchema;
+    private final TablePath tablePath;
+    private QdrantClient qdrantClient;
+
+    public QdrantSourceReader(
+            QdrantParameters qdrantParameters,
+            SingleSplitReaderContext context,
+            CatalogTable catalogTable) {
+        this.qdrantParameters = qdrantParameters;
+        this.context = context;
+        this.tableSchema = catalogTable.getTableSchema();
+        this.tablePath = catalogTable.getTablePath();
+    }
+
+    @Override
+    public void open() throws Exception {
+        qdrantClient = qdrantParameters.buildQdrantClient();
+        qdrantClient.healthCheckAsync().get();
+    }
+
+    @Override
+    public void close() {
+        if (Objects.nonNull(qdrantClient)) {
+            qdrantClient.close();
+        }
+    }
+
+    @Override
+    public void internalPollNext(Collector<SeaTunnelRow> output) throws 
Exception {
+        int SCROLL_SIZE = 64;
+        Points.ScrollPoints request =
+                Points.ScrollPoints.newBuilder()
+                        
.setCollectionName(qdrantParameters.getCollectionName())
+                        .setLimit(SCROLL_SIZE)
+                        .setWithPayload(enable(true))
+                        
.setWithVectors(WithVectorsSelectorFactory.enable(true))
+                        .build();
+
+        while (true) {
+            Points.ScrollResponse response = 
qdrantClient.scrollAsync(request).get();
+            List<Points.RetrievedPoint> points = response.getResultList();
+
+            for (Points.RetrievedPoint point : points) {
+                SeaTunnelRow seaTunnelRow = convertToSeaTunnelRow(point);
+                output.collect(seaTunnelRow);
+            }
+
+            Points.PointId offset = response.getNextPageOffset();
+
+            if (!offset.hasNum() && !offset.hasUuid()) break;
+
+            request = request.toBuilder().setOffset(offset).build();
+        }
+
+        context.signalNoMoreElement();
+    }
+
+    private SeaTunnelRow convertToSeaTunnelRow(Points.RetrievedPoint point) {
+        SeaTunnelRowType typeInfo = tableSchema.toPhysicalRowDataType();
+        PrimaryKey primaryKey = tableSchema.getPrimaryKey();
+        Map<String, JsonWithInt.Value> payloadMap = point.getPayloadMap();
+        Points.Vectors vectors = point.getVectors();
+        Map<String, Points.Vector> vectorsMap = new HashMap<>();
+        String DEFAULT_VECTOR_KEY = "default_vector";
+
+        if (vectors.hasVector()) {
+            vectorsMap.put(DEFAULT_VECTOR_KEY, vectors.getVector());
+        } else if (vectors.hasVectors()) {
+            vectorsMap = vectors.getVectors().getVectorsMap();
+        }
+        Object[] fields = new Object[typeInfo.getTotalFields()];
+        String[] fieldNames = typeInfo.getFieldNames();
+        for (int fieldIndex = 0; fieldIndex < typeInfo.getTotalFields(); 
fieldIndex++) {
+            SeaTunnelDataType<?> seaTunnelDataType = 
typeInfo.getFieldType(fieldIndex);
+            String fieldName = fieldNames[fieldIndex];
+
+            if (isPrimaryKeyField(primaryKey, fieldName)) {
+                Points.PointId id = point.getId();
+                if (id.hasNum()) {
+                    fields[fieldIndex] = id.getNum();
+                } else if (id.hasUuid()) {
+                    fields[fieldIndex] = id.getUuid();
+                }
+                continue;
+            }
+            JsonWithInt.Value value = payloadMap.get(fieldName);
+            Points.Vector vector = vectorsMap.get(fieldName);
+            switch (seaTunnelDataType.getSqlType()) {
+                case NULL:
+                    fields[fieldIndex] = null;
+                    break;
+                case STRING:
+                    fields[fieldIndex] = value.getStringValue();
+                    break;
+                case BOOLEAN:
+                    fields[fieldIndex] = value.getBoolValue();
+                    break;
+                case TINYINT:
+                case SMALLINT:
+                case INT:
+                case BIGINT:
+                    fields[fieldIndex] = value.getIntegerValue();
+                    break;
+                case FLOAT:
+                case DECIMAL:
+                case DOUBLE:
+                    fields[fieldIndex] = value.getDoubleValue();
+                    break;
+                case BINARY_VECTOR:
+                case FLOAT_VECTOR:
+                case FLOAT16_VECTOR:
+                case BFLOAT16_VECTOR:
+                    List<Float> list = vector.getDataList();
+                    Float[] vectorArray = new Float[list.size()];
+                    list.toArray(vectorArray);
+                    fields[fieldIndex] = BufferUtils.toByteBuffer(vectorArray);
+                    break;
+                default:
+                    throw new QdrantConnectorException(
+                            CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+                            "Unexpected value: " + 
seaTunnelDataType.getSqlType().name());
+            }
+        }
+
+        SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields);
+        seaTunnelRow.setTableId(tablePath.getFullName());
+        seaTunnelRow.setRowKind(RowKind.INSERT);
+        return seaTunnelRow;
+    }
+}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index 669adb8905..cf7314e619 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -79,6 +79,7 @@
         <module>connector-web3j</module>
         <module>connector-milvus</module>
         <module>connector-activemq</module>
+        <module>connector-qdrant</module>
         <module>connector-sls</module>
         <module>connector-typesense</module>
     </modules>
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index a633b64a1d..584224c8fd 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -598,6 +598,13 @@
                     <scope>provided</scope>
                 </dependency>
 
+                <dependency>
+                    <groupId>org.apache.seatunnel</groupId>
+                    <artifactId>connector-qdrant</artifactId>
+                    <version>${project.version}</version>
+                    <scope>provided</scope>
+                </dependency>
+
                 <!-- jdbc driver -->
                 <dependency>
                     <groupId>com.aliyun.phoenix</groupId>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-qdrant-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-qdrant-e2e/pom.xml
new file mode 100644
index 0000000000..042e528c5e
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-qdrant-e2e/pom.xml
@@ -0,0 +1,68 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-connector-v2-e2e</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>connector-qdrant-e2e</artifactId>
+    <name>SeaTunnel : E2E : Connector V2 : Qdrant</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-guava</artifactId>
+            <version>${project.version}</version>
+            <classifier>optional</classifier>
+        </dependency>
+
+        <dependency>
+            <groupId>io.qdrant</groupId>
+            <artifactId>client</artifactId>
+            <version>1.11.0</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-qdrant</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>qdrant</artifactId>
+            <version>1.20.1</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-assert</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-fake</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-qdrant-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/qdrant/QdrantIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-qdrant-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/qdrant/QdrantIT.java
new file mode 100644
index 0000000000..21854a12ca
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-qdrant-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/qdrant/QdrantIT.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.v2.qdrant;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.qdrant.QdrantContainer;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
+
+import io.qdrant.client.QdrantClient;
+import io.qdrant.client.QdrantGrpcClient;
+import io.qdrant.client.grpc.Collections;
+import io.qdrant.client.grpc.Points;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static io.qdrant.client.PointIdFactory.id;
+import static io.qdrant.client.ValueFactory.value;
+import static io.qdrant.client.VectorFactory.vector;
+import static io.qdrant.client.VectorsFactory.namedVectors;
+
+@Slf4j
+@DisabledOnContainer(
+        value = {},
+        type = {EngineType.SPARK, EngineType.FLINK},
+        disabledReason = "SPARK and FLINK do not support vector types yet")
+public class QdrantIT extends TestSuiteBase implements TestResource {
+
+    private static final String ALIAS = "qdrante2e";
+    private static final String SOURCE_COLLECTION = "source_collection";
+    private static final String SINK_COLLECTION = "sink_collection";
+    private static final String IMAGE = "qdrant/qdrant:latest";
+    private QdrantContainer container;
+    private QdrantClient qdrantClient;
+
+    @BeforeAll
+    @Override
+    public void startUp() throws Exception {
+        this.container = new 
QdrantContainer(IMAGE).withNetwork(NETWORK).withNetworkAliases(ALIAS);
+        Startables.deepStart(Stream.of(this.container)).join();
+        Awaitility.given().ignoreExceptions().await().atMost(10L, 
TimeUnit.SECONDS);
+        this.initQdrant();
+        this.initSourceData();
+    }
+
+    private void initQdrant() {
+        qdrantClient =
+                new QdrantClient(
+                        QdrantGrpcClient.newBuilder(
+                                        container.getHost(), 
container.getGrpcPort(), false)
+                                .build());
+    }
+
+    private void initSourceData() throws Exception {
+        qdrantClient
+                .createCollectionAsync(
+                        SOURCE_COLLECTION,
+                        ImmutableMap.of(
+                                "my_vector",
+                                Collections.VectorParams.newBuilder()
+                                        .setSize(4)
+                                        
.setDistance(Collections.Distance.Cosine)
+                                        .build()))
+                .get();
+
+        qdrantClient
+                .createCollectionAsync(
+                        SINK_COLLECTION,
+                        ImmutableMap.of(
+                                "my_vector",
+                                Collections.VectorParams.newBuilder()
+                                        .setSize(4)
+                                        
.setDistance(Collections.Distance.Cosine)
+                                        .build()))
+                .get();
+
+        List<Points.PointStruct> points = new ArrayList<>();
+        for (int i = 1; i <= 10; i++) {
+            Points.PointStruct.Builder pointStruct = 
Points.PointStruct.newBuilder();
+            pointStruct.setId(id(i));
+            List<Float> floats = Arrays.asList((float) i, (float) i, (float) 
i, (float) i);
+            pointStruct.setVectors(namedVectors(ImmutableMap.of("my_vector", 
vector(floats))));
+
+            pointStruct.putPayload("file_size", value(i));
+            pointStruct.putPayload("file_name", value("file-name-" + i));
+
+            points.add(pointStruct.build());
+        }
+
+        qdrantClient
+                .upsertAsync(
+                        Points.UpsertPoints.newBuilder()
+                                .setCollectionName(SOURCE_COLLECTION)
+                                .addAllPoints(points)
+                                .build())
+                .get();
+    }
+
+    @AfterAll
+    @Override
+    public void tearDown() {
+        this.qdrantClient.close();
+    }
+
+    @TestTemplate
+    public void testQdrant(TestContainer container)
+            throws IOException, InterruptedException, ExecutionException {
+        Container.ExecResult execResult = 
container.executeJob("/qdrant-to-qdrant.conf");
+        Assertions.assertEquals(execResult.getExitCode(), 0);
+        
Assertions.assertEquals(qdrantClient.countAsync(SINK_COLLECTION).get(), 10);
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-qdrant-e2e/src/test/resources/qdrant-to-qdrant.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-qdrant-e2e/src/test/resources/qdrant-to-qdrant.conf
new file mode 100644
index 0000000000..8fa4c4f1da
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-qdrant-e2e/src/test/resources/qdrant-to-qdrant.conf
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Qdrant {
+    collection_name = "source_collection"
+    host = "qdrante2e"
+    schema = {
+              columns = [
+                 {
+                    name = file_name
+                    type = string
+                 }
+                 {
+                    name = file_size
+                    type = int
+                 }
+                 {
+                    name = my_vector
+                    type = float_vector
+                 }
+             ]
+    }
+  }
+}
+
+sink {
+  Qdrant {
+    collection_name = "sink_collection"
+    host = "qdrante2e"
+  }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index b0c224219b..28be63f3cf 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -76,6 +76,7 @@
         <module>connector-hudi-e2e</module>
         <module>connector-milvus-e2e</module>
         <module>connector-activemq-e2e</module>
+        <module>connector-qdrant-e2e</module>
         <module>connector-sls-e2e</module>
         <module>connector-typesense-e2e</module>
         <module>connector-email-e2e</module>

Reply via email to