This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new c8590716ae [Feature][Connector-V2] Support Qdrant sink and source connector (#7299) c8590716ae is described below commit c8590716aecfaabc449c462a17c58a64e3b298bf Author: Anush <anushshett...@gmail.com> AuthorDate: Fri Aug 30 22:42:27 2024 +0530 [Feature][Connector-V2] Support Qdrant sink and source connector (#7299) --- .github/workflows/labeler/label-scope-conf.yml | 9 +- config/plugin_config | 1 + docs/en/connector-v2/sink/Qdrant.md | 70 ++++++++ docs/en/connector-v2/source/Qdrant.md | 81 +++++++++ docs/zh/connector-v2/sink/Qdrant.md | 68 ++++++++ docs/zh/connector-v2/source/Qdrant.md | 79 +++++++++ plugin-mapping.properties | 2 + seatunnel-connectors-v2/connector-qdrant/pom.xml | 63 +++++++ .../seatunnel/qdrant/config/QdrantConfig.java | 50 ++++++ .../seatunnel/qdrant/config/QdrantParameters.java | 47 +++++ .../qdrant/exception/QdrantConnectorException.java | 36 ++++ .../seatunnel/qdrant/sink/QdrantBatchWriter.java | 190 +++++++++++++++++++++ .../seatunnel/qdrant/sink/QdrantSink.java | 50 ++++++ .../seatunnel/qdrant/sink/QdrantSinkFactory.java | 55 ++++++ .../seatunnel/qdrant/sink/QdrantSinkWriter.java | 60 +++++++ .../seatunnel/qdrant/source/QdrantSource.java | 63 +++++++ .../qdrant/source/QdrantSourceFactory.java | 63 +++++++ .../qdrant/source/QdrantSourceReader.java | 181 ++++++++++++++++++++ seatunnel-connectors-v2/pom.xml | 1 + seatunnel-dist/pom.xml | 7 + .../connector-qdrant-e2e/pom.xml | 68 ++++++++ .../e2e/connector/v2/qdrant/QdrantIT.java | 145 ++++++++++++++++ .../src/test/resources/qdrant-to-qdrant.conf | 51 ++++++ seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml | 1 + 24 files changed, 1440 insertions(+), 1 deletion(-) diff --git a/.github/workflows/labeler/label-scope-conf.yml b/.github/workflows/labeler/label-scope-conf.yml index a506268d39..b417d53e72 100644 --- a/.github/workflows/labeler/label-scope-conf.yml +++ b/.github/workflows/labeler/label-scope-conf.yml @@ -257,6 +257,13 @@ activemq: - changed-files: - any-glob-to-any-file: seatunnel-connectors-v2/connector-activemq/** - all-globs-to-all-files: '!seatunnel-connectors-v2/connector-!(activemq)/**' + +qdrant: + - all: + - changed-files: + - any-glob-to-any-file: seatunnel-connectors-v2/connector-qdrant/** + - all-globs-to-all-files: '!seatunnel-connectors-v2/connector-!(qdrant)/**' + typesense: - all: - changed-files: @@ -285,4 +292,4 @@ sls: - all: - changed-files: - any-glob-to-any-file: seatunnel-connectors-v2/connector-sls/** - - all-globs-to-all-files: '!seatunnel-connectors-v2/connector-!(sls)/**' \ No newline at end of file + - all-globs-to-all-files: '!seatunnel-connectors-v2/connector-!(sls)/**' diff --git a/config/plugin_config b/config/plugin_config index e0dd233300..26eb4cab4a 100644 --- a/config/plugin_config +++ b/config/plugin_config @@ -88,5 +88,6 @@ connector-web3j connector-milvus connector-activemq connector-sls +connector-qdrant connector-typesense connector-cdc-opengauss diff --git a/docs/en/connector-v2/sink/Qdrant.md b/docs/en/connector-v2/sink/Qdrant.md new file mode 100644 index 0000000000..e94598d291 --- /dev/null +++ b/docs/en/connector-v2/sink/Qdrant.md @@ -0,0 +1,70 @@ +# Qdrant + +> Qdrant Sink Connector + +## Description + +[Qdrant](https://qdrant.tech/) is a high-performance vector search engine and vector database. + +This connector can be used to write data into a Qdrant collection. + +## Data Type Mapping + +| SeaTunnel Data Type | Qdrant Data Type | +|---------------------|------------------| +| TINYINT | INTEGER | +| SMALLINT | INTEGER | +| INT | INTEGER | +| BIGINT | INTEGER | +| FLOAT | DOUBLE | +| DOUBLE | DOUBLE | +| BOOLEAN | BOOL | +| STRING | STRING | +| ARRAY | LIST | +| FLOAT_VECTOR | DENSE_VECTOR | +| BINARY_VECTOR | DENSE_VECTOR | +| FLOAT16_VECTOR | DENSE_VECTOR | +| BFLOAT16_VECTOR | DENSE_VECTOR | +| SPARSE_FLOAT_VECTOR | SPARSE_VECTOR | + +The value of the primary key column will be used as point ID in Qdrant. If no primary key is present, a random UUID will be used. + +## Options + +| name | type | required | default value | +|-----------------|--------|----------|---------------| +| collection_name | string | yes | - | +| batch_size | int | no | 64 | +| host | string | no | localhost | +| port | int | no | 6334 | +| api_key | string | no | - | +| use_tls | int | no | false | +| common-options | | no | - | + +### collection_name [string] + +The name of the Qdrant collection to read data from. + +### batch_size [int] + +The batch size of each upsert request to Qdrant. + +### host [string] + +The host name of the Qdrant instance. Defaults to "localhost". + +### port [int] + +The gRPC port of the Qdrant instance. + +### api_key [string] + +The API key to use for authentication if set. + +### use_tls [bool] + +Whether to use TLS(SSL) connection. Required if using Qdrant cloud(https). + +### common options + +Sink plugin common parameters, please refer to [Source Common Options](../sink-common-options.md) for details. diff --git a/docs/en/connector-v2/source/Qdrant.md b/docs/en/connector-v2/source/Qdrant.md new file mode 100644 index 0000000000..c523cf1da6 --- /dev/null +++ b/docs/en/connector-v2/source/Qdrant.md @@ -0,0 +1,81 @@ +# Qdrant + +> Qdrant source connector + +## Description + +[Qdrant](https://qdrant.tech/) is a high-performance vector search engine and vector database. + +This connector can be used to read data from a Qdrant collection. + +## Options + +| name | type | required | default value | +|-----------------|--------|----------|---------------| +| collection_name | string | yes | - | +| schema | config | yes | - | +| host | string | no | localhost | +| port | int | no | 6334 | +| api_key | string | no | - | +| use_tls | int | no | false | +| common-options | | no | - | + +### collection_name [string] + +The name of the Qdrant collection to read data from. + +### schema [config] + +The schema of the table to read data into. + +Eg: + +```hocon +schema = { + fields { + age = int + address = string + some_vector = float_vector + } +} +``` + +Each entry in Qdrant is called a point. + +The `float_vector` type columns are read from the vectors of each point, others are read from the JSON payload associated with the point. + +If a column is marked as primary key, the ID of the Qdrant point is written into it. It can be of type `"string"` or `"int"`. Since Qdrant only [allows](https://qdrant.tech/documentation/concepts/points/#point-ids) positive integers and UUIDs as point IDs. + +If the collection was created with a single default/unnamed vector, use `default_vector` as the vector name. + +```hocon +schema = { + fields { + age = int + address = string + default_vector = float_vector + } +} +``` + +The ID of the point in Qdrant will be written into the column which is marked as the primary key. It can be of type `int` or `string`. + +### host [string] + +The host name of the Qdrant instance. Defaults to "localhost". + +### port [int] + +The gRPC port of the Qdrant instance. + +### api_key [string] + +The API key to use for authentication if set. + +### use_tls [bool] + +Whether to use TLS(SSL) connection. Required if using Qdrant cloud(https). + +### common options + +Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details. diff --git a/docs/zh/connector-v2/sink/Qdrant.md b/docs/zh/connector-v2/sink/Qdrant.md new file mode 100644 index 0000000000..7394eb8541 --- /dev/null +++ b/docs/zh/connector-v2/sink/Qdrant.md @@ -0,0 +1,68 @@ +# Qdrant + +> Qdrant 数据连接器 + +[Qdrant](https://qdrant.tech/) 是一个高性能的向量搜索引擎和向量数据库。 + +该连接器可用于将数据写入 Qdrant 集合。 + +## 数据类型映射 + +| SeaTunnel 数据类型 | Qdrant 数据类型 | +|---------------------|---------------| +| TINYINT | INTEGER | +| SMALLINT | INTEGER | +| INT | INTEGER | +| BIGINT | INTEGER | +| FLOAT | DOUBLE | +| DOUBLE | DOUBLE | +| BOOLEAN | BOOL | +| STRING | STRING | +| ARRAY | LIST | +| FLOAT_VECTOR | DENSE_VECTOR | +| BINARY_VECTOR | DENSE_VECTOR | +| FLOAT16_VECTOR | DENSE_VECTOR | +| BFLOAT16_VECTOR | DENSE_VECTOR | +| SPARSE_FLOAT_VECTOR | SPARSE_VECTOR | + +主键列的值将用作 Qdrant 中的点 ID。如果没有主键,则将使用随机 UUID。 + +## 选项 + +| 名称 | 类型 | 必填 | 默认值 | +|-----------------|--------|----|-----------| +| collection_name | string | 是 | - | +| batch_size | int | 否 | 64 | +| host | string | 否 | localhost | +| port | int | 否 | 6334 | +| api_key | string | 否 | - | +| use_tls | bool | 否 | false | +| common-options | | 否 | - | + +### collection_name [string] + +要从中读取数据的 Qdrant 集合的名称。 + +### batch_size [int] + +每个 upsert 请求到 Qdrant 的批量大小。 + +### host [string] + +Qdrant 实例的主机名。默认为 "localhost"。 + +### port [int] + +Qdrant 实例的 gRPC 端口。 + +### api_key [string] + +用于身份验证的 API 密钥(如果设置)。 + +### use_tls [bool] + +是否使用 TLS(SSL)连接。如果使用 Qdrant 云(https),则需要。 + +### 通用选项 + +接收插件的通用参数,请参考[源通用选项](../sink-common-options.md)了解详情。 diff --git a/docs/zh/connector-v2/source/Qdrant.md b/docs/zh/connector-v2/source/Qdrant.md new file mode 100644 index 0000000000..140ff36a39 --- /dev/null +++ b/docs/zh/connector-v2/source/Qdrant.md @@ -0,0 +1,79 @@ +# Qdrant + +> Qdrant 数据源连接器 + +[Qdrant](https://qdrant.tech/) 是一个高性能的向量搜索引擎和向量数据库。 + +该连接器可用于从 Qdrant 集合中读取数据。 + +## 选项 + +| 名称 | 类型 | 必填 | 默认值 | +|-----------------|--------|----|-----------| +| collection_name | string | 是 | - | +| schema | config | 是 | - | +| host | string | 否 | localhost | +| port | int | 否 | 6334 | +| api_key | string | 否 | - | +| use_tls | bool | 否 | false | +| common-options | | 否 | - | + +### collection_name [string] + +要从中读取数据的 Qdrant 集合的名称。 + +### schema [config] + +要将数据读取到的表的模式。 + +例如: + +```hocon +schema = { + fields { + age = int + address = string + some_vector = float_vector + } +} +``` + +Qdrant 中的每个条目称为一个点。 + +`float_vector` 类型的列从每个点的向量中读取,其他列从与该点关联的 JSON 有效负载中读取。 + +如果列被标记为主键,Qdrant 点的 ID 将写入其中。它可以是 `"string"` 或 `"int"` 类型。因为 Qdrant 仅[允许](https://qdrant.tech/documentation/concepts/points/#point-ids)使用正整数和 UUID 作为点 ID。 + +如果集合是用单个默认/未命名向量创建的,请使用 `default_vector` 作为向量名称。 + +```hocon +schema = { + fields { + age = int + address = string + default_vector = float_vector + } +} +``` + +Qdrant 中点的 ID 将写入标记为主键的列中。它可以是 `int` 或 `string` 类型。 + +### host [string] + +Qdrant 实例的主机名。默认为 "localhost"。 + +### port [int] + +Qdrant 实例的 gRPC 端口。 + +### api_key [string] + +用于身份验证的 API 密钥(如果设置)。 + +### use_tls [bool] + +是否使用 TLS(SSL)连接。如果使用 Qdrant 云(https),则需要。 + +### 通用选项 + +源插件的通用参数,请参考[源通用选项](../source-common-options.md)了解详情。**** diff --git a/plugin-mapping.properties b/plugin-mapping.properties index 06a01ec04b..1ddedd5ea8 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -131,6 +131,8 @@ seatunnel.sink.ObsFile = connector-file-obs seatunnel.source.Milvus = connector-milvus seatunnel.sink.Milvus = connector-milvus seatunnel.sink.ActiveMQ = connector-activemq +seatunnel.source.Qdrant = connector-qdrant +seatunnel.sink.Qdrant = connector-qdrant seatunnel.source.Sls = connector-sls seatunnel.source.Typesense = connector-typesense seatunnel.sink.Typesense = connector-typesense diff --git a/seatunnel-connectors-v2/connector-qdrant/pom.xml b/seatunnel-connectors-v2/connector-qdrant/pom.xml new file mode 100644 index 0000000000..686f0bdb7a --- /dev/null +++ b/seatunnel-connectors-v2/connector-qdrant/pom.xml @@ -0,0 +1,63 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.seatunnel</groupId> + <artifactId>seatunnel-connectors-v2</artifactId> + <version>${revision}</version> + </parent> + + <artifactId>connector-qdrant</artifactId> + <name>SeaTunnel : Connectors V2 : Qdrant</name> + + <properties> + <connector.name>connector.qdrant</connector.name> + </properties> + <dependencies> + + <dependency> + <groupId>org.apache.seatunnel</groupId> + <artifactId>connector-common</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-protobuf</artifactId> + <version>1.65.1</version> + </dependency> + + <dependency> + <groupId>org.apache.seatunnel</groupId> + <artifactId>seatunnel-guava</artifactId> + <version>${project.version}</version> + <classifier>optional</classifier> + </dependency> + + <dependency> + <groupId>io.qdrant</groupId> + <artifactId>client</artifactId> + <version>1.11.0</version> + </dependency> + + </dependencies> +</project> diff --git a/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/config/QdrantConfig.java b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/config/QdrantConfig.java new file mode 100644 index 0000000000..1be03a1311 --- /dev/null +++ b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/config/QdrantConfig.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.qdrant.config; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; + +public class QdrantConfig { + + public static final String CONNECTOR_IDENTITY = "Qdrant"; + + public static final Option<String> HOST = + Options.key("host") + .stringType() + .defaultValue("localhost") + .withDescription("Qdrant gRPC host"); + + public static final Option<Integer> PORT = + Options.key("port").intType().defaultValue(6334).withDescription("Qdrant gRPC port"); + + public static final Option<String> API_KEY = + Options.key("api_key").stringType().defaultValue("").withDescription("Qdrant API key"); + + public static final Option<String> COLLECTION_NAME = + Options.key("collection_name") + .stringType() + .noDefaultValue() + .withDescription("Qdrant collection name"); + + public static final Option<Boolean> USE_TLS = + Options.key("use_tls") + .booleanType() + .defaultValue(false) + .withDescription("Whether to use TLS"); +} diff --git a/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/config/QdrantParameters.java b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/config/QdrantParameters.java new file mode 100644 index 0000000000..1ae612fafb --- /dev/null +++ b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/config/QdrantParameters.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.qdrant.config; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; + +import io.qdrant.client.QdrantClient; +import io.qdrant.client.QdrantGrpcClient; +import lombok.Data; + +import java.io.Serializable; + +@Data +public class QdrantParameters implements Serializable { + private String host; + private int port; + private String apiKey; + private String collectionName; + private boolean useTls; + + public QdrantParameters(ReadonlyConfig config) { + this.host = config.get(QdrantConfig.HOST); + this.port = config.get(QdrantConfig.PORT); + this.apiKey = config.get(QdrantConfig.API_KEY); + this.collectionName = config.get(QdrantConfig.COLLECTION_NAME); + this.useTls = config.get(QdrantConfig.USE_TLS); + } + + public QdrantClient buildQdrantClient() { + return new QdrantClient(QdrantGrpcClient.newBuilder(host, port, useTls).build()); + } +} diff --git a/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/exception/QdrantConnectorException.java b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/exception/QdrantConnectorException.java new file mode 100644 index 0000000000..becf31abf5 --- /dev/null +++ b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/exception/QdrantConnectorException.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.qdrant.exception; + +import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; + +public class QdrantConnectorException extends SeaTunnelRuntimeException { + public QdrantConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) { + super(seaTunnelErrorCode, errorMessage); + } + + public QdrantConnectorException( + SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) { + super(seaTunnelErrorCode, errorMessage, cause); + } + + public QdrantConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) { + super(seaTunnelErrorCode, cause); + } +} diff --git a/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/sink/QdrantBatchWriter.java b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/sink/QdrantBatchWriter.java new file mode 100644 index 0000000000..7ca4428c81 --- /dev/null +++ b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/sink/QdrantBatchWriter.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.qdrant.sink; + +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.api.table.type.SqlType; +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.common.utils.BufferUtils; +import org.apache.seatunnel.connectors.seatunnel.qdrant.config.QdrantParameters; +import org.apache.seatunnel.connectors.seatunnel.qdrant.exception.QdrantConnectorException; + +import org.apache.commons.collections4.CollectionUtils; + +import io.qdrant.client.QdrantClient; +import io.qdrant.client.ValueFactory; +import io.qdrant.client.VectorFactory; +import io.qdrant.client.grpc.JsonWithInt; +import io.qdrant.client.grpc.Points; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +import static io.qdrant.client.PointIdFactory.id; +import static org.apache.seatunnel.api.table.catalog.PrimaryKey.isPrimaryKeyField; + +public class QdrantBatchWriter { + + private final int batchSize; + private final CatalogTable catalogTable; + private final String collectionName; + private final QdrantClient qdrantClient; + + private final List<Points.PointStruct> qdrantDataCache; + private volatile int writeCount = 0; + + public QdrantBatchWriter( + CatalogTable catalogTable, Integer batchSize, QdrantParameters params) { + this.catalogTable = catalogTable; + this.qdrantClient = params.buildQdrantClient(); + this.collectionName = params.getCollectionName(); + this.batchSize = batchSize; + this.qdrantDataCache = new ArrayList<>(batchSize); + } + + public void addToBatch(SeaTunnelRow element) { + Points.PointStruct point = buildPoint(element); + qdrantDataCache.add(point); + writeCount++; + } + + public boolean needFlush() { + return this.writeCount >= this.batchSize; + } + + public synchronized void flush() { + if (CollectionUtils.isEmpty(this.qdrantDataCache)) { + return; + } + upsert(); + this.qdrantDataCache.clear(); + this.writeCount = 0; + } + + public void close() { + this.qdrantClient.close(); + } + + private Points.PointStruct buildPoint(SeaTunnelRow element) { + SeaTunnelRowType seaTunnelRowType = catalogTable.getSeaTunnelRowType(); + PrimaryKey primaryKey = catalogTable.getTableSchema().getPrimaryKey(); + + Points.PointStruct.Builder point = Points.PointStruct.newBuilder(); + Points.NamedVectors.Builder namedVectors = Points.NamedVectors.newBuilder(); + for (int i = 0; i < seaTunnelRowType.getFieldNames().length; i++) { + String fieldName = seaTunnelRowType.getFieldNames()[i]; + SeaTunnelDataType<?> fieldType = seaTunnelRowType.getFieldType(i); + Object value = element.getField(i); + + if (isPrimaryKeyField(primaryKey, fieldName)) { + point.setId(pointId(fieldType, value)); + continue; + } + + JsonWithInt.Value payloadValue = buildPayload(fieldType, value); + if (payloadValue != null) { + point.putPayload(fieldName, payloadValue); + continue; + } + + Points.Vector vector = buildVector(fieldType, value); + if (vector != null) { + namedVectors.putVectors(fieldName, vector); + } + } + + if (!point.hasId()) { + point.setId(id(UUID.randomUUID())); + } + + point.setVectors(Points.Vectors.newBuilder().setVectors(namedVectors).build()); + return point.build(); + } + + private void upsert() { + try { + qdrantClient + .upsertAsync( + Points.UpsertPoints.newBuilder() + .setCollectionName(collectionName) + .addAllPoints(qdrantDataCache) + .build()) + .get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("Upsert failed", e); + } + } + + public static Points.PointId pointId(SeaTunnelDataType<?> fieldType, Object value) { + SqlType sqlType = fieldType.getSqlType(); + switch (sqlType) { + case INT: + return id(Integer.parseInt(value.toString())); + case STRING: + return id(UUID.fromString(value.toString())); + default: + throw new QdrantConnectorException( + CommonErrorCode.UNSUPPORTED_DATA_TYPE, + "Unexpected value type for point ID: " + sqlType.name()); + } + } + + public static JsonWithInt.Value buildPayload(SeaTunnelDataType<?> fieldType, Object value) { + SqlType sqlType = fieldType.getSqlType(); + switch (sqlType) { + case SMALLINT: + case INT: + case BIGINT: + return ValueFactory.value(Integer.parseInt(value.toString())); + case FLOAT: + case DOUBLE: + return ValueFactory.value(Long.parseLong(value.toString())); + case STRING: + case DATE: + return ValueFactory.value(value.toString()); + case BOOLEAN: + return ValueFactory.value(Boolean.parseBoolean(value.toString())); + default: + return null; + } + } + + public static Points.Vector buildVector(SeaTunnelDataType<?> fieldType, Object value) { + SqlType sqlType = fieldType.getSqlType(); + switch (sqlType) { + case FLOAT_VECTOR: + case FLOAT16_VECTOR: + case BFLOAT16_VECTOR: + case BINARY_VECTOR: + ByteBuffer floatVectorBuffer = (ByteBuffer) value; + Float[] floats = BufferUtils.toFloatArray(floatVectorBuffer); + return VectorFactory.vector(Arrays.stream(floats).collect(Collectors.toList())); + default: + return null; + } + } +} diff --git a/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/sink/QdrantSink.java b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/sink/QdrantSink.java new file mode 100644 index 0000000000..85119032c8 --- /dev/null +++ b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/sink/QdrantSink.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.qdrant.sink; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.sink.SupportMultiTableSink; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; +import org.apache.seatunnel.connectors.seatunnel.qdrant.config.QdrantConfig; +import org.apache.seatunnel.connectors.seatunnel.qdrant.config.QdrantParameters; + +import java.io.IOException; + +public class QdrantSink extends AbstractSimpleSink<SeaTunnelRow, Void> + implements SupportMultiTableSink { + private final QdrantParameters qdrantParameters; + private final CatalogTable catalogTable; + + public QdrantSink(ReadonlyConfig config, CatalogTable table) { + this.qdrantParameters = new QdrantParameters(config); + this.catalogTable = table; + } + + @Override + public String getPluginName() { + return QdrantConfig.CONNECTOR_IDENTITY; + } + + @Override + public QdrantSinkWriter createWriter(SinkWriter.Context context) throws IOException { + return new QdrantSinkWriter(catalogTable, qdrantParameters); + } +} diff --git a/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/sink/QdrantSinkFactory.java b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/sink/QdrantSinkFactory.java new file mode 100644 index 0000000000..a7ed5599d5 --- /dev/null +++ b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/sink/QdrantSinkFactory.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.qdrant.sink; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.sink.SinkCommonOptions; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.connector.TableSink; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; +import org.apache.seatunnel.connectors.seatunnel.qdrant.config.QdrantConfig; + +import com.google.auto.service.AutoService; + +@AutoService(Factory.class) +public class QdrantSinkFactory implements TableSinkFactory { + @Override + public String factoryIdentifier() { + return QdrantConfig.CONNECTOR_IDENTITY; + } + + @Override + public TableSink createSink(TableSinkFactoryContext context) { + CatalogTable catalogTable = context.getCatalogTable(); + return () -> new QdrantSink(context.getOptions(), catalogTable); + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder() + .optional( + QdrantConfig.HOST, + QdrantConfig.PORT, + QdrantConfig.API_KEY, + QdrantConfig.USE_TLS, + SinkCommonOptions.MULTI_TABLE_SINK_REPLICA) + .build(); + } +} diff --git a/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/sink/QdrantSinkWriter.java b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/sink/QdrantSinkWriter.java new file mode 100644 index 0000000000..a0e00838b6 --- /dev/null +++ b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/sink/QdrantSinkWriter.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.qdrant.sink; + +import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; +import org.apache.seatunnel.connectors.seatunnel.qdrant.config.QdrantParameters; + +import java.io.IOException; +import java.util.Optional; + +public class QdrantSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> + implements SupportMultiTableSinkWriter<Void> { + + private final QdrantBatchWriter batchWriter; + + public QdrantSinkWriter(CatalogTable catalog, QdrantParameters qdrantParameters) { + int batchSize = 64; + this.batchWriter = new QdrantBatchWriter(catalog, batchSize, qdrantParameters); + } + + @Override + public void write(SeaTunnelRow element) throws IOException { + batchWriter.addToBatch(element); + if (batchWriter.needFlush()) { + batchWriter.flush(); + } + } + + @Override + public Optional<Void> prepareCommit() { + batchWriter.flush(); + return Optional.empty(); + } + + private void clearBuffer() {} + + @Override + public void close() throws IOException { + batchWriter.flush(); + batchWriter.close(); + } +} diff --git a/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/source/QdrantSource.java b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/source/QdrantSource.java new file mode 100644 index 0000000000..39aeb3a879 --- /dev/null +++ b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/source/QdrantSource.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.qdrant.source; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader; +import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource; +import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext; +import org.apache.seatunnel.connectors.seatunnel.qdrant.config.QdrantConfig; +import org.apache.seatunnel.connectors.seatunnel.qdrant.config.QdrantParameters; + +import java.util.Collections; +import java.util.List; + +public class QdrantSource extends AbstractSingleSplitSource<SeaTunnelRow> { + private final QdrantParameters qdrantParameters; + private final CatalogTable catalogTable; + + @Override + public String getPluginName() { + return QdrantConfig.CONNECTOR_IDENTITY; + } + + public QdrantSource(ReadonlyConfig readonlyConfig) { + this.qdrantParameters = new QdrantParameters(readonlyConfig); + this.catalogTable = CatalogTableUtil.buildWithConfig(readonlyConfig); + } + + @Override + public Boundedness getBoundedness() { + return Boundedness.BOUNDED; + } + + @Override + public List<CatalogTable> getProducedCatalogTables() { + return Collections.singletonList(catalogTable); + } + + @Override + public AbstractSingleSplitReader<SeaTunnelRow> createReader( + SingleSplitReaderContext readerContext) { + return new QdrantSourceReader(qdrantParameters, readerContext, catalogTable); + } +} diff --git a/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/source/QdrantSourceFactory.java b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/source/QdrantSourceFactory.java new file mode 100644 index 0000000000..0639fc07e2 --- /dev/null +++ b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/source/QdrantSourceFactory.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.qdrant.source; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions; +import org.apache.seatunnel.api.table.connector.TableSource; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSourceFactory; +import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; +import org.apache.seatunnel.connectors.seatunnel.qdrant.config.QdrantConfig; + +import com.google.auto.service.AutoService; + +import java.io.Serializable; + +@AutoService(Factory.class) +public class QdrantSourceFactory implements TableSourceFactory { + @Override + public String factoryIdentifier() { + return QdrantConfig.CONNECTOR_IDENTITY; + } + + @Override + public <T, SplitT extends SourceSplit, StateT extends Serializable> + TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) { + return () -> (SeaTunnelSource<T, SplitT, StateT>) new QdrantSource(context.getOptions()); + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder() + .required(QdrantConfig.COLLECTION_NAME, TableSchemaOptions.SCHEMA) + .optional( + QdrantConfig.HOST, + QdrantConfig.PORT, + QdrantConfig.API_KEY, + QdrantConfig.USE_TLS) + .build(); + } + + @Override + public Class<? extends SeaTunnelSource> getSourceClass() { + return QdrantSource.class; + } +} diff --git a/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/source/QdrantSourceReader.java b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/source/QdrantSourceReader.java new file mode 100644 index 0000000000..2c37163129 --- /dev/null +++ b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/source/QdrantSourceReader.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.qdrant.source; + +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.type.RowKind; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.common.utils.BufferUtils; +import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader; +import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext; +import org.apache.seatunnel.connectors.seatunnel.qdrant.config.QdrantParameters; +import org.apache.seatunnel.connectors.seatunnel.qdrant.exception.QdrantConnectorException; + +import io.qdrant.client.QdrantClient; +import io.qdrant.client.WithVectorsSelectorFactory; +import io.qdrant.client.grpc.JsonWithInt; +import io.qdrant.client.grpc.Points; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import static io.qdrant.client.WithPayloadSelectorFactory.enable; +import static org.apache.seatunnel.api.table.catalog.PrimaryKey.isPrimaryKeyField; + +public class QdrantSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> { + private final QdrantParameters qdrantParameters; + private final SingleSplitReaderContext context; + private final TableSchema tableSchema; + private final TablePath tablePath; + private QdrantClient qdrantClient; + + public QdrantSourceReader( + QdrantParameters qdrantParameters, + SingleSplitReaderContext context, + CatalogTable catalogTable) { + this.qdrantParameters = qdrantParameters; + this.context = context; + this.tableSchema = catalogTable.getTableSchema(); + this.tablePath = catalogTable.getTablePath(); + } + + @Override + public void open() throws Exception { + qdrantClient = qdrantParameters.buildQdrantClient(); + qdrantClient.healthCheckAsync().get(); + } + + @Override + public void close() { + if (Objects.nonNull(qdrantClient)) { + qdrantClient.close(); + } + } + + @Override + public void internalPollNext(Collector<SeaTunnelRow> output) throws Exception { + int SCROLL_SIZE = 64; + Points.ScrollPoints request = + Points.ScrollPoints.newBuilder() + .setCollectionName(qdrantParameters.getCollectionName()) + .setLimit(SCROLL_SIZE) + .setWithPayload(enable(true)) + .setWithVectors(WithVectorsSelectorFactory.enable(true)) + .build(); + + while (true) { + Points.ScrollResponse response = qdrantClient.scrollAsync(request).get(); + List<Points.RetrievedPoint> points = response.getResultList(); + + for (Points.RetrievedPoint point : points) { + SeaTunnelRow seaTunnelRow = convertToSeaTunnelRow(point); + output.collect(seaTunnelRow); + } + + Points.PointId offset = response.getNextPageOffset(); + + if (!offset.hasNum() && !offset.hasUuid()) break; + + request = request.toBuilder().setOffset(offset).build(); + } + + context.signalNoMoreElement(); + } + + private SeaTunnelRow convertToSeaTunnelRow(Points.RetrievedPoint point) { + SeaTunnelRowType typeInfo = tableSchema.toPhysicalRowDataType(); + PrimaryKey primaryKey = tableSchema.getPrimaryKey(); + Map<String, JsonWithInt.Value> payloadMap = point.getPayloadMap(); + Points.Vectors vectors = point.getVectors(); + Map<String, Points.Vector> vectorsMap = new HashMap<>(); + String DEFAULT_VECTOR_KEY = "default_vector"; + + if (vectors.hasVector()) { + vectorsMap.put(DEFAULT_VECTOR_KEY, vectors.getVector()); + } else if (vectors.hasVectors()) { + vectorsMap = vectors.getVectors().getVectorsMap(); + } + Object[] fields = new Object[typeInfo.getTotalFields()]; + String[] fieldNames = typeInfo.getFieldNames(); + for (int fieldIndex = 0; fieldIndex < typeInfo.getTotalFields(); fieldIndex++) { + SeaTunnelDataType<?> seaTunnelDataType = typeInfo.getFieldType(fieldIndex); + String fieldName = fieldNames[fieldIndex]; + + if (isPrimaryKeyField(primaryKey, fieldName)) { + Points.PointId id = point.getId(); + if (id.hasNum()) { + fields[fieldIndex] = id.getNum(); + } else if (id.hasUuid()) { + fields[fieldIndex] = id.getUuid(); + } + continue; + } + JsonWithInt.Value value = payloadMap.get(fieldName); + Points.Vector vector = vectorsMap.get(fieldName); + switch (seaTunnelDataType.getSqlType()) { + case NULL: + fields[fieldIndex] = null; + break; + case STRING: + fields[fieldIndex] = value.getStringValue(); + break; + case BOOLEAN: + fields[fieldIndex] = value.getBoolValue(); + break; + case TINYINT: + case SMALLINT: + case INT: + case BIGINT: + fields[fieldIndex] = value.getIntegerValue(); + break; + case FLOAT: + case DECIMAL: + case DOUBLE: + fields[fieldIndex] = value.getDoubleValue(); + break; + case BINARY_VECTOR: + case FLOAT_VECTOR: + case FLOAT16_VECTOR: + case BFLOAT16_VECTOR: + List<Float> list = vector.getDataList(); + Float[] vectorArray = new Float[list.size()]; + list.toArray(vectorArray); + fields[fieldIndex] = BufferUtils.toByteBuffer(vectorArray); + break; + default: + throw new QdrantConnectorException( + CommonErrorCode.UNSUPPORTED_DATA_TYPE, + "Unexpected value: " + seaTunnelDataType.getSqlType().name()); + } + } + + SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields); + seaTunnelRow.setTableId(tablePath.getFullName()); + seaTunnelRow.setRowKind(RowKind.INSERT); + return seaTunnelRow; + } +} diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml index 669adb8905..cf7314e619 100644 --- a/seatunnel-connectors-v2/pom.xml +++ b/seatunnel-connectors-v2/pom.xml @@ -79,6 +79,7 @@ <module>connector-web3j</module> <module>connector-milvus</module> <module>connector-activemq</module> + <module>connector-qdrant</module> <module>connector-sls</module> <module>connector-typesense</module> </modules> diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml index a633b64a1d..584224c8fd 100644 --- a/seatunnel-dist/pom.xml +++ b/seatunnel-dist/pom.xml @@ -598,6 +598,13 @@ <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.seatunnel</groupId> + <artifactId>connector-qdrant</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <!-- jdbc driver --> <dependency> <groupId>com.aliyun.phoenix</groupId> diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-qdrant-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-qdrant-e2e/pom.xml new file mode 100644 index 0000000000..042e528c5e --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-qdrant-e2e/pom.xml @@ -0,0 +1,68 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.seatunnel</groupId> + <artifactId>seatunnel-connector-v2-e2e</artifactId> + <version>${revision}</version> + </parent> + + <artifactId>connector-qdrant-e2e</artifactId> + <name>SeaTunnel : E2E : Connector V2 : Qdrant</name> + + <dependencies> + <dependency> + <groupId>org.apache.seatunnel</groupId> + <artifactId>seatunnel-guava</artifactId> + <version>${project.version}</version> + <classifier>optional</classifier> + </dependency> + + <dependency> + <groupId>io.qdrant</groupId> + <artifactId>client</artifactId> + <version>1.11.0</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.seatunnel</groupId> + <artifactId>connector-qdrant</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>qdrant</artifactId> + <version>1.20.1</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.seatunnel</groupId> + <artifactId>connector-assert</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.seatunnel</groupId> + <artifactId>connector-fake</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + </dependencies> +</project> diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-qdrant-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/qdrant/QdrantIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-qdrant-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/qdrant/QdrantIT.java new file mode 100644 index 0000000000..21854a12ca --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-qdrant-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/qdrant/QdrantIT.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.v2.qdrant; + +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.EngineType; +import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; + +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.qdrant.QdrantContainer; +import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; + +import io.qdrant.client.QdrantClient; +import io.qdrant.client.QdrantGrpcClient; +import io.qdrant.client.grpc.Collections; +import io.qdrant.client.grpc.Points; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import static io.qdrant.client.PointIdFactory.id; +import static io.qdrant.client.ValueFactory.value; +import static io.qdrant.client.VectorFactory.vector; +import static io.qdrant.client.VectorsFactory.namedVectors; + +@Slf4j +@DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}, + disabledReason = "SPARK and FLINK do not support vector types yet") +public class QdrantIT extends TestSuiteBase implements TestResource { + + private static final String ALIAS = "qdrante2e"; + private static final String SOURCE_COLLECTION = "source_collection"; + private static final String SINK_COLLECTION = "sink_collection"; + private static final String IMAGE = "qdrant/qdrant:latest"; + private QdrantContainer container; + private QdrantClient qdrantClient; + + @BeforeAll + @Override + public void startUp() throws Exception { + this.container = new QdrantContainer(IMAGE).withNetwork(NETWORK).withNetworkAliases(ALIAS); + Startables.deepStart(Stream.of(this.container)).join(); + Awaitility.given().ignoreExceptions().await().atMost(10L, TimeUnit.SECONDS); + this.initQdrant(); + this.initSourceData(); + } + + private void initQdrant() { + qdrantClient = + new QdrantClient( + QdrantGrpcClient.newBuilder( + container.getHost(), container.getGrpcPort(), false) + .build()); + } + + private void initSourceData() throws Exception { + qdrantClient + .createCollectionAsync( + SOURCE_COLLECTION, + ImmutableMap.of( + "my_vector", + Collections.VectorParams.newBuilder() + .setSize(4) + .setDistance(Collections.Distance.Cosine) + .build())) + .get(); + + qdrantClient + .createCollectionAsync( + SINK_COLLECTION, + ImmutableMap.of( + "my_vector", + Collections.VectorParams.newBuilder() + .setSize(4) + .setDistance(Collections.Distance.Cosine) + .build())) + .get(); + + List<Points.PointStruct> points = new ArrayList<>(); + for (int i = 1; i <= 10; i++) { + Points.PointStruct.Builder pointStruct = Points.PointStruct.newBuilder(); + pointStruct.setId(id(i)); + List<Float> floats = Arrays.asList((float) i, (float) i, (float) i, (float) i); + pointStruct.setVectors(namedVectors(ImmutableMap.of("my_vector", vector(floats)))); + + pointStruct.putPayload("file_size", value(i)); + pointStruct.putPayload("file_name", value("file-name-" + i)); + + points.add(pointStruct.build()); + } + + qdrantClient + .upsertAsync( + Points.UpsertPoints.newBuilder() + .setCollectionName(SOURCE_COLLECTION) + .addAllPoints(points) + .build()) + .get(); + } + + @AfterAll + @Override + public void tearDown() { + this.qdrantClient.close(); + } + + @TestTemplate + public void testQdrant(TestContainer container) + throws IOException, InterruptedException, ExecutionException { + Container.ExecResult execResult = container.executeJob("/qdrant-to-qdrant.conf"); + Assertions.assertEquals(execResult.getExitCode(), 0); + Assertions.assertEquals(qdrantClient.countAsync(SINK_COLLECTION).get(), 10); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-qdrant-e2e/src/test/resources/qdrant-to-qdrant.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-qdrant-e2e/src/test/resources/qdrant-to-qdrant.conf new file mode 100644 index 0000000000..8fa4c4f1da --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-qdrant-e2e/src/test/resources/qdrant-to-qdrant.conf @@ -0,0 +1,51 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + Qdrant { + collection_name = "source_collection" + host = "qdrante2e" + schema = { + columns = [ + { + name = file_name + type = string + } + { + name = file_size + type = int + } + { + name = my_vector + type = float_vector + } + ] + } + } +} + +sink { + Qdrant { + collection_name = "sink_collection" + host = "qdrante2e" + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml index b0c224219b..28be63f3cf 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml @@ -76,6 +76,7 @@ <module>connector-hudi-e2e</module> <module>connector-milvus-e2e</module> <module>connector-activemq-e2e</module> + <module>connector-qdrant-e2e</module> <module>connector-sls-e2e</module> <module>connector-typesense-e2e</module> <module>connector-email-e2e</module>