This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 0c69b9166e [Feature][Connector-V2][Milvus] Support Milvus source & sink (#7158) 0c69b9166e is described below commit 0c69b9166efb9637f83f0f8f817250f2cce15522 Author: Thomas-HuWei <99788018+thomas-hu...@users.noreply.github.com> AuthorDate: Fri Jul 12 17:50:26 2024 +0800 [Feature][Connector-V2][Milvus] Support Milvus source & sink (#7158) --- config/plugin_config | 1 + docs/en/connector-v2/sink/Mivlus.md | 59 +++ docs/en/connector-v2/source/Mivlus.md | 55 +++ plugin-mapping.properties | 2 + .../apache/seatunnel/api/table/catalog/Column.java | 3 +- .../seatunnel/api/table/catalog/ConstraintKey.java | 3 +- .../seatunnel/api/table/catalog/PrimaryKey.java | 19 + .../seatunnel/api/table/catalog/VectorIndex.java | 110 ++++++ .../seatunnel/api/table/type/SeaTunnelRow.java | 2 + .../apache/seatunnel/api/table/type/SqlType.java | 5 + .../seatunnel/api/table/type/VectorType.java | 85 +++++ seatunnel-connectors-v2/connector-milvus/pom.xml | 60 ++++ .../seatunnel/milvus/catalog/MilvusCatalog.java | 380 ++++++++++++++++++++ .../milvus/catalog/MilvusCatalogFactory.java | 45 +++ .../seatunnel/milvus/catalog/MilvusOptions.java | 24 +- .../seatunnel/milvus/config/MilvusSinkConfig.java | 87 +++++ .../milvus/config/MilvusSourceConfig.java | 48 +++ .../milvus/convert/MilvusConvertUtils.java | 397 +++++++++++++++++++++ .../exception/MilvusConnectionErrorCode.java | 57 +++ .../milvus/exception/MilvusConnectorException.java | 41 +++ .../seatunnel/milvus/sink/MilvusSink.java | 116 ++++++ .../seatunnel/milvus/sink/MilvusSinkCommitter.java | 56 +++ .../seatunnel/milvus/sink/MilvusSinkFactory.java | 80 +++++ .../seatunnel/milvus/sink/MilvusSinkWriter.java | 129 +++++++ .../milvus/sink/batch/MilvusBatchWriter.java | 33 +- .../milvus/sink/batch/MilvusBufferBatchWriter.java | 143 ++++++++ .../seatunnel/milvus/source/MilvusSource.java | 82 +++++ .../milvus/source/MilvusSourceFactory.java | 61 ++++ .../milvus/source/MilvusSourceReader.java | 261 ++++++++++++++ .../seatunnel/milvus/source/MilvusSourceSplit.java | 39 +- .../milvus/source/MilvusSourceSplitEnumertor.java | 192 ++++++++++ .../seatunnel/milvus/source/MilvusSourceState.java | 36 +- .../milvus/state/MilvusAggregatedCommitInfo.java | 32 +- .../seatunnel/milvus/state/MilvusCommitInfo.java | 31 +- .../seatunnel/milvus/state/MilvusSinkState.java | 33 +- seatunnel-connectors-v2/pom.xml | 1 + seatunnel-dist/pom.xml | 7 + .../connector-milvus-e2e/pom.xml | 66 ++++ .../e2e/connector/v2/milvus/MilvusIT.java | 218 +++++++++++ .../src/test/resources/milvus-to-milvus.conf | 36 ++ seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml | 1 + 41 files changed, 2985 insertions(+), 151 deletions(-) diff --git a/config/plugin_config b/config/plugin_config index e642a30021..d80d2e6ab0 100644 --- a/config/plugin_config +++ b/config/plugin_config @@ -85,4 +85,5 @@ connector-paimon connector-rocketmq connector-tdengine connector-web3j +connector-milvus --end-- \ No newline at end of file diff --git a/docs/en/connector-v2/sink/Mivlus.md b/docs/en/connector-v2/sink/Mivlus.md new file mode 100644 index 0000000000..081f427a5d --- /dev/null +++ b/docs/en/connector-v2/sink/Mivlus.md @@ -0,0 +1,59 @@ +# Milvus + +> Milvus sink connector + +## Description + +Write data to Milvus or Zilliz Cloud + +## Key Features + +- [x] [batch](../../concept/connector-v2-features.md) +- [x] [exactly-once](../../concept/connector-v2-features.md) +- [ ] [column projection](../../concept/connector-v2-features.md) + +## Data Type Mapping + +| Milvus Data Type | SeaTunnel Data Type | +|---------------------|---------------------| +| INT8 | TINYINT | +| INT16 | SMALLINT | +| INT32 | INT | +| INT64 | BIGINT | +| FLOAT | FLOAT | +| DOUBLE | DOUBLE | +| BOOL | BOOLEAN | +| JSON | STRING | +| ARRAY | ARRAY | +| VARCHAR | STRING | +| FLOAT_VECTOR | FLOAT_VECTOR | +| BINARY_VECTOR | BINARY_VECTOR | +| FLOAT16_VECTOR | FLOAT16_VECTOR | +| BFLOAT16_VECTOR | BFLOAT16_VECTOR | +| SPARSE_FLOAT_VECTOR | SPARSE_FLOAT_VECTOR | + +## Sink Options + +| Name | Type | Required | Default | Description | +|----------------------|---------|----------|------------------------------|-----------------------------------------------------------| +| url | String | Yes | - | The URL to connect to Milvus or Zilliz Cloud. | +| token | String | Yes | - | User:password | +| database | String | No | - | Write data to which database, default is source database. | +| schema_save_mode | enum | No | CREATE_SCHEMA_WHEN_NOT_EXIST | Auto create table when table not exist. | +| enable_auto_id | boolean | No | false | Primary key column enable autoId. | +| enable_upsert | boolean | No | false | Upsert data not insert. | +| enable_dynamic_field | boolean | No | true | Enable create table with dynamic field. | +| batch_size | int | No | 1000 | Write batch size. | + +## Task Example + +```bash +sink { + Milvus { + url = "http://127.0.0.1:19530" + token = "username:password" + batch_size = 1000 + } +} +``` + diff --git a/docs/en/connector-v2/source/Mivlus.md b/docs/en/connector-v2/source/Mivlus.md new file mode 100644 index 0000000000..a56df4c5fe --- /dev/null +++ b/docs/en/connector-v2/source/Mivlus.md @@ -0,0 +1,55 @@ +# Milvus + +> Milvus source connector + +## Description + +Read data from Milvus or Zilliz Cloud + +## Key Features + +- [x] [batch](../../concept/connector-v2-features.md) +- [x] [exactly-once](../../concept/connector-v2-features.md) +- [ ] [column projection](../../concept/connector-v2-features.md) + +## Data Type Mapping + +| Milvus Data Type | SeaTunnel Data Type | +|---------------------|---------------------| +| INT8 | TINYINT | +| INT16 | SMALLINT | +| INT32 | INT | +| INT64 | BIGINT | +| FLOAT | FLOAT | +| DOUBLE | DOUBLE | +| BOOL | BOOLEAN | +| JSON | STRING | +| ARRAY | ARRAY | +| VARCHAR | STRING | +| FLOAT_VECTOR | FLOAT_VECTOR | +| BINARY_VECTOR | BINARY_VECTOR | +| FLOAT16_VECTOR | FLOAT16_VECTOR | +| BFLOAT16_VECTOR | BFLOAT16_VECTOR | +| SPARSE_FLOAT_VECTOR | SPARSE_FLOAT_VECTOR | + +## Source Options + +| Name | Type | Required | Default | Description | +|------------|--------|----------|---------|--------------------------------------------------------------------------------------------| +| url | String | Yes | - | The URL to connect to Milvus or Zilliz Cloud. | +| token | String | Yes | - | User:password | +| database | String | Yes | default | Read data from which database. | +| collection | String | No | - | If set, will only read one collection, otherwise will read all collections under database. | + +## Task Example + +```bash +source { + Milvus { + url = "http://127.0.0.1:19530" + token = "username:password" + database = "default" + } +} +``` + diff --git a/plugin-mapping.properties b/plugin-mapping.properties index 6304236ec3..9936afcbaa 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -127,3 +127,5 @@ seatunnel.source.Oracle-CDC = connector-cdc-oracle seatunnel.sink.Pulsar = connector-pulsar seatunnel.source.ObsFile = connector-file-obs seatunnel.sink.ObsFile = connector-file-obs +seatunnel.source.Milvus = connector-milvus +seatunnel.sink.Milvus = connector-milvus diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java index d7e236d309..9c3ed338c9 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java @@ -60,7 +60,8 @@ public abstract class Column implements Serializable { * Number of digits to right of the decimal point. * * <p>For decimal data, this is the maximum scale. For time/timestamp data, this is the maximum - * allowed precision of the fractional seconds component. + * allowed precision of the fractional seconds component. For vector data, this is the vector + * dimension. * * <p>Null is returned for data types where the scale is not applicable. */ diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/ConstraintKey.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/ConstraintKey.java index 2d39641a42..f2d62852a0 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/ConstraintKey.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/ConstraintKey.java @@ -72,7 +72,8 @@ public class ConstraintKey implements Serializable { public enum ConstraintType { INDEX_KEY, UNIQUE_KEY, - FOREIGN_KEY + FOREIGN_KEY, + VECTOR_INDEX_KEY } public enum ColumnSortType { diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PrimaryKey.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PrimaryKey.java index e8a3a74025..ad88539c2f 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PrimaryKey.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PrimaryKey.java @@ -34,6 +34,25 @@ public class PrimaryKey implements Serializable { private final List<String> columnNames; + private Boolean enableAutoId; + + public PrimaryKey(String primaryKey, List<String> columnNames) { + this.primaryKey = primaryKey; + this.columnNames = columnNames; + this.enableAutoId = null; + } + + public static boolean isPrimaryKeyField(PrimaryKey primaryKey, String fieldName) { + if (primaryKey == null || primaryKey.getColumnNames() == null) { + return false; + } + return primaryKey.getColumnNames().contains(fieldName); + } + + public static PrimaryKey of(String primaryKey, List<String> columnNames, Boolean autoId) { + return new PrimaryKey(primaryKey, columnNames, autoId); + } + public static PrimaryKey of(String primaryKey, List<String> columnNames) { return new PrimaryKey(primaryKey, columnNames); } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/VectorIndex.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/VectorIndex.java new file mode 100644 index 0000000000..5d6dd1beaa --- /dev/null +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/VectorIndex.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.api.table.catalog; + +import lombok.EqualsAndHashCode; +import lombok.Getter; + +import java.io.Serializable; + +/** Vector Database need special Index on its vector field. */ +@EqualsAndHashCode(callSuper = true) +@Getter +public class VectorIndex extends ConstraintKey.ConstraintKeyColumn implements Serializable { + + /** Vector index name */ + private final String indexName; + + /** Vector indexType, such as IVF_FLAT, HNSW, DISKANN */ + private final IndexType indexType; + + /** Vector index metricType, such as L2, IP, COSINE */ + private final MetricType metricType; + + public VectorIndex(String indexName, String columnName, String indexType, String metricType) { + super(columnName, null); + this.indexName = indexName; + this.indexType = IndexType.of(indexType); + this.metricType = MetricType.of(metricType); + } + + public VectorIndex( + String indexName, String columnName, IndexType indexType, MetricType metricType) { + super(columnName, null); + this.indexName = indexName; + this.indexType = indexType; + this.metricType = metricType; + } + + @Override + public ConstraintKey.ConstraintKeyColumn copy() { + return new VectorIndex(indexName, getColumnName(), indexType, metricType); + } + + public enum IndexType { + FLAT, + IVF_FLAT, + IVF_SQ8, + IVF_PQ, + HNSW, + DISKANN, + AUTOINDEX, + SCANN, + + // GPU indexes only for float vectors + GPU_IVF_FLAT, + GPU_IVF_PQ, + GPU_BRUTE_FORCE, + GPU_CAGRA, + + // Only supported for binary vectors + BIN_FLAT, + BIN_IVF_FLAT, + + // Only for varchar type field + TRIE, + // Only for scalar type field + STL_SORT, // only for numeric type field + INVERTED, // works for all scalar fields except JSON type field + + // Only for sparse vectors + SPARSE_INVERTED_INDEX, + SPARSE_WAND, + ; + + public static IndexType of(String name) { + return valueOf(name.toUpperCase()); + } + } + + public enum MetricType { + // Only for float vectors + L2, + IP, + COSINE, + + // Only for binary vectors + HAMMING, + JACCARD, + ; + + public static MetricType of(String name) { + return valueOf(name.toUpperCase()); + } + } +} diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java index 1e507cb1fa..95a36b796c 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java @@ -141,6 +141,8 @@ public final class SeaTunnelRow implements Serializable { return 12; case TIMESTAMP: return 48; + case FLOAT_VECTOR: + return getArrayNotNullSize((Object[]) v) * 4; case ARRAY: SeaTunnelDataType elementType = ((ArrayType) dataType).getElementType(); if (elementType instanceof DecimalType) { diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java index 838a384809..e33ceb8d3c 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java @@ -35,6 +35,11 @@ public enum SqlType { DATE, TIME, TIMESTAMP, + BINARY_VECTOR, + FLOAT_VECTOR, + FLOAT16_VECTOR, + BFLOAT16_VECTOR, + SPARSE_FLOAT_VECTOR, ROW, MULTIPLE_ROW; } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/VectorType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/VectorType.java new file mode 100644 index 0000000000..39d2849f1a --- /dev/null +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/VectorType.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.api.table.type; + +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.Objects; + +public class VectorType<T> implements SeaTunnelDataType<T> { + private static final long serialVersionUID = 2L; + + public static final VectorType<Float> VECTOR_FLOAT_TYPE = + new VectorType<>(Float.class, SqlType.FLOAT_VECTOR); + + public static final VectorType<Map> VECTOR_SPARSE_FLOAT_TYPE = + new VectorType<>(Map.class, SqlType.SPARSE_FLOAT_VECTOR); + + public static final VectorType<Byte> VECTOR_BINARY_TYPE = + new VectorType<>(Byte.class, SqlType.BINARY_VECTOR); + + public static final VectorType<ByteBuffer> VECTOR_FLOAT16_TYPE = + new VectorType<>(ByteBuffer.class, SqlType.FLOAT16_VECTOR); + + public static final VectorType<ByteBuffer> VECTOR_BFLOAT16_TYPE = + new VectorType<>(ByteBuffer.class, SqlType.BFLOAT16_VECTOR); + + // -------------------------------------------------------------------------------------------- + + /** The physical type class. */ + private final Class<T> typeClass; + + private final SqlType sqlType; + + protected VectorType(Class<T> typeClass, SqlType sqlType) { + this.typeClass = typeClass; + this.sqlType = sqlType; + } + + @Override + public Class<T> getTypeClass() { + return this.typeClass; + } + + @Override + public SqlType getSqlType() { + return this.sqlType; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof VectorType)) { + return false; + } + VectorType<?> that = (VectorType<?>) obj; + return Objects.equals(typeClass, that.typeClass) && Objects.equals(sqlType, that.sqlType); + } + + @Override + public int hashCode() { + return Objects.hash(typeClass, sqlType); + } + + @Override + public String toString() { + return sqlType.toString(); + } +} diff --git a/seatunnel-connectors-v2/connector-milvus/pom.xml b/seatunnel-connectors-v2/connector-milvus/pom.xml new file mode 100644 index 0000000000..50d69d4f5b --- /dev/null +++ b/seatunnel-connectors-v2/connector-milvus/pom.xml @@ -0,0 +1,60 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.seatunnel</groupId> + <artifactId>seatunnel-connectors-v2</artifactId> + <version>${revision}</version> + </parent> + + <artifactId>connector-milvus</artifactId> + <name>SeaTunnel : Connectors V2 : Milvus</name> + + <dependencies> + <dependency> + <groupId>io.milvus</groupId> + <artifactId>milvus-sdk-java</artifactId> + <version>2.4.1</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-reload4j</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <version>4.11.0</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-inline</artifactId> + <version>4.11.0</version> + <scope>test</scope> + </dependency> + + </dependencies> + +</project> diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusCatalog.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusCatalog.java new file mode 100644 index 0000000000..dcca41320c --- /dev/null +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusCatalog.java @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.milvus.catalog; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.Catalog; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.ConstraintKey; +import org.apache.seatunnel.api.table.catalog.InfoPreviewResult; +import org.apache.seatunnel.api.table.catalog.PreviewResult; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.catalog.VectorIndex; +import org.apache.seatunnel.api.table.catalog.exception.CatalogException; +import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException; +import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; +import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException; +import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException; +import org.apache.seatunnel.api.table.type.ArrayType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig; +import org.apache.seatunnel.connectors.seatunnel.milvus.convert.MilvusConvertUtils; +import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectionErrorCode; +import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException; + +import org.apache.commons.collections4.CollectionUtils; + +import io.milvus.client.MilvusServiceClient; +import io.milvus.common.clientenum.ConsistencyLevelEnum; +import io.milvus.grpc.DataType; +import io.milvus.grpc.ListDatabasesResponse; +import io.milvus.grpc.ShowCollectionsResponse; +import io.milvus.grpc.ShowType; +import io.milvus.param.ConnectParam; +import io.milvus.param.IndexType; +import io.milvus.param.MetricType; +import io.milvus.param.R; +import io.milvus.param.RpcStatus; +import io.milvus.param.collection.CreateCollectionParam; +import io.milvus.param.collection.CreateDatabaseParam; +import io.milvus.param.collection.DropCollectionParam; +import io.milvus.param.collection.DropDatabaseParam; +import io.milvus.param.collection.FieldType; +import io.milvus.param.collection.HasCollectionParam; +import io.milvus.param.collection.ShowCollectionsParam; +import io.milvus.param.index.CreateIndexParam; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +import static com.google.common.base.Preconditions.checkNotNull; + +@Slf4j +public class MilvusCatalog implements Catalog { + + private final String catalogName; + private final ReadonlyConfig config; + + private MilvusServiceClient client; + + public MilvusCatalog(String catalogName, ReadonlyConfig config) { + this.catalogName = catalogName; + this.config = config; + } + + @Override + public void open() throws CatalogException { + ConnectParam connectParam = + ConnectParam.newBuilder() + .withUri(config.get(MilvusSinkConfig.URL)) + .withToken(config.get(MilvusSinkConfig.TOKEN)) + .build(); + try { + this.client = new MilvusServiceClient(connectParam); + } catch (Exception e) { + throw new CatalogException(String.format("Failed to open catalog %s", catalogName), e); + } + } + + @Override + public void close() throws CatalogException { + this.client.close(); + } + + @Override + public String name() { + return catalogName; + } + + @Override + public PreviewResult previewAction( + ActionType actionType, TablePath tablePath, Optional<CatalogTable> catalogTable) { + if (actionType == ActionType.CREATE_TABLE) { + return new InfoPreviewResult("create collection " + tablePath.getTableName()); + } else if (actionType == ActionType.DROP_TABLE) { + return new InfoPreviewResult("drop collection " + tablePath.getTableName()); + } else if (actionType == ActionType.CREATE_DATABASE) { + return new InfoPreviewResult("create database " + tablePath.getDatabaseName()); + } else if (actionType == ActionType.DROP_DATABASE) { + return new InfoPreviewResult("drop database " + tablePath.getDatabaseName()); + } else { + throw new UnsupportedOperationException("Unsupported action type: " + actionType); + } + } + + @Override + public String getDefaultDatabase() throws CatalogException { + return "default"; + } + + @Override + public boolean databaseExists(String databaseName) throws CatalogException { + List<String> databases = this.listDatabases(); + return databases.contains(databaseName); + } + + @Override + public List<String> listDatabases() throws CatalogException { + R<ListDatabasesResponse> response = this.client.listDatabases(); + return response.getData().getDbNamesList(); + } + + @Override + public List<String> listTables(String databaseName) + throws CatalogException, DatabaseNotExistException { + R<ShowCollectionsResponse> response = + this.client.showCollections( + ShowCollectionsParam.newBuilder() + .withDatabaseName(databaseName) + .withShowType(ShowType.All) + .build()); + + return response.getData().getCollectionNamesList(); + } + + @Override + public boolean tableExists(TablePath tablePath) throws CatalogException { + R<Boolean> response = + this.client.hasCollection( + HasCollectionParam.newBuilder() + .withDatabaseName(tablePath.getDatabaseName()) + .withCollectionName(tablePath.getTableName()) + .build()); + if (response.getData() != null) { + return response.getData(); + } + throw new MilvusConnectorException( + MilvusConnectionErrorCode.SERVER_RESPONSE_FAILED, + response.getMessage(), + response.getException()); + } + + @Override + public CatalogTable getTable(TablePath tablePath) + throws CatalogException, TableNotExistException { + throw new RuntimeException("not implemented"); + } + + @Override + public void createTable(TablePath tablePath, CatalogTable catalogTable, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + checkNotNull(tablePath, "Table path cannot be null"); + if (!databaseExists(tablePath.getDatabaseName())) { + throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName()); + } + if (tableExists(tablePath)) { + if (ignoreIfExists) { + return; + } + throw new TableAlreadyExistException(catalogName, tablePath); + } + + checkNotNull(catalogTable, "catalogTable must not be null"); + TableSchema tableSchema = catalogTable.getTableSchema(); + checkNotNull(tableSchema, "tableSchema must not be null"); + createTableInternal(tablePath, catalogTable); + + if (CollectionUtils.isNotEmpty(tableSchema.getConstraintKeys())) { + for (ConstraintKey constraintKey : tableSchema.getConstraintKeys()) { + if (constraintKey + .getConstraintType() + .equals(ConstraintKey.ConstraintType.VECTOR_INDEX_KEY)) { + createIndexInternal(tablePath, constraintKey.getColumnNames()); + } + } + } + } + + private void createIndexInternal( + TablePath tablePath, List<ConstraintKey.ConstraintKeyColumn> vectorIndexes) { + for (ConstraintKey.ConstraintKeyColumn column : vectorIndexes) { + VectorIndex index = (VectorIndex) column; + CreateIndexParam createIndexParam = + CreateIndexParam.newBuilder() + .withDatabaseName(tablePath.getDatabaseName()) + .withCollectionName(tablePath.getTableName()) + .withFieldName(index.getColumnName()) + .withIndexName(index.getIndexName()) + .withIndexType(IndexType.valueOf(index.getIndexType().name())) + .withMetricType(MetricType.valueOf(index.getMetricType().name())) + .build(); + + R<RpcStatus> response = client.createIndex(createIndexParam); + if (!Objects.equals(response.getStatus(), R.success().getStatus())) { + throw new MilvusConnectorException( + MilvusConnectionErrorCode.CREATE_INDEX_ERROR, response.getMessage()); + } + } + } + + public void createTableInternal(TablePath tablePath, CatalogTable catalogTable) { + try { + TableSchema tableSchema = catalogTable.getTableSchema(); + List<FieldType> fieldTypes = new ArrayList<>(); + for (Column column : tableSchema.getColumns()) { + fieldTypes.add(convertToFieldType(column, tableSchema.getPrimaryKey())); + } + + Map<String, String> options = catalogTable.getOptions(); + Boolean enableDynamicField = + (options.containsKey(MilvusOptions.ENABLE_DYNAMIC_FIELD)) + ? Boolean.valueOf(options.get(MilvusOptions.ENABLE_DYNAMIC_FIELD)) + : config.get(MilvusSinkConfig.ENABLE_DYNAMIC_FIELD); + + CreateCollectionParam.Builder builder = + CreateCollectionParam.newBuilder() + .withDatabaseName(tablePath.getDatabaseName()) + .withCollectionName(tablePath.getTableName()) + .withFieldTypes(fieldTypes) + .withEnableDynamicField(enableDynamicField) + .withConsistencyLevel(ConsistencyLevelEnum.BOUNDED); + if (null != catalogTable.getComment()) { + builder.withDescription(catalogTable.getComment()); + } + + CreateCollectionParam createCollectionParam = builder.build(); + R<RpcStatus> response = this.client.createCollection(createCollectionParam); + if (!Objects.equals(response.getStatus(), R.success().getStatus())) { + throw new MilvusConnectorException( + MilvusConnectionErrorCode.CREATE_COLLECTION_ERROR, response.getMessage()); + } + } catch (Exception e) { + throw new MilvusConnectorException( + MilvusConnectionErrorCode.CREATE_COLLECTION_ERROR, e); + } + } + + private FieldType convertToFieldType(Column column, PrimaryKey primaryKey) { + SeaTunnelDataType<?> seaTunnelDataType = column.getDataType(); + FieldType.Builder build = + FieldType.newBuilder() + .withName(column.getName()) + .withDataType( + MilvusConvertUtils.convertSqlTypeToDataType( + seaTunnelDataType.getSqlType())); + switch (seaTunnelDataType.getSqlType()) { + case ROW: + build.withMaxLength(65535); + break; + case DATE: + build.withMaxLength(20); + break; + case INT: + build.withDataType(DataType.Int32); + break; + case SMALLINT: + build.withDataType(DataType.Int16); + break; + case TINYINT: + build.withDataType(DataType.Int8); + break; + case FLOAT: + build.withDataType(DataType.Float); + break; + case DOUBLE: + build.withDataType(DataType.Double); + break; + case MAP: + build.withDataType(DataType.JSON); + break; + case BOOLEAN: + build.withDataType(DataType.Bool); + break; + case STRING: + if (column.getColumnLength() == 0) { + build.withMaxLength(512); + } else { + build.withMaxLength((int) (column.getColumnLength() / 4)); + } + break; + case ARRAY: + ArrayType arrayType = (ArrayType) column.getDataType(); + SeaTunnelDataType elementType = arrayType.getElementType(); + build.withElementType( + MilvusConvertUtils.convertSqlTypeToDataType(elementType.getSqlType())); + build.withMaxCapacity(4095); + switch (elementType.getSqlType()) { + case STRING: + if (column.getColumnLength() == 0) { + build.withMaxLength(512); + } else { + build.withMaxLength((int) (column.getColumnLength() / 4)); + } + break; + } + break; + case BINARY_VECTOR: + case FLOAT_VECTOR: + case FLOAT16_VECTOR: + case BFLOAT16_VECTOR: + build.withDimension(column.getScale()); + break; + } + + if (null != primaryKey && primaryKey.getColumnNames().contains(column.getName())) { + build.withPrimaryKey(true); + if (null != primaryKey.getEnableAutoId()) { + build.withAutoID(primaryKey.getEnableAutoId()); + } else { + build.withAutoID(config.get(MilvusSinkConfig.ENABLE_AUTO_ID)); + } + } + + return build.build(); + } + + @Override + public void dropTable(TablePath tablePath, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + this.client.dropCollection( + DropCollectionParam.newBuilder() + .withDatabaseName(tablePath.getDatabaseName()) + .withCollectionName(tablePath.getTableName()) + .build()); + } + + @Override + public void createDatabase(TablePath tablePath, boolean ignoreIfExists) + throws DatabaseAlreadyExistException, CatalogException { + R<RpcStatus> response = + this.client.createDatabase( + CreateDatabaseParam.newBuilder() + .withDatabaseName(tablePath.getDatabaseName()) + .build()); + if (!R.success().getStatus().equals(response.getStatus())) { + throw new MilvusConnectorException( + MilvusConnectionErrorCode.CREATE_DATABASE_ERROR, response.getMessage()); + } + } + + @Override + public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists) + throws DatabaseNotExistException, CatalogException { + this.client.dropDatabase( + DropDatabaseParam.newBuilder() + .withDatabaseName(tablePath.getDatabaseName()) + .build()); + } +} diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusCatalogFactory.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusCatalogFactory.java new file mode 100644 index 0000000000..292c0464f2 --- /dev/null +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusCatalogFactory.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.milvus.catalog; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.catalog.Catalog; +import org.apache.seatunnel.api.table.factory.CatalogFactory; +import org.apache.seatunnel.api.table.factory.Factory; + +import com.google.auto.service.AutoService; + +@AutoService(Factory.class) +public class MilvusCatalogFactory implements CatalogFactory { + + @Override + public Catalog createCatalog(String catalogName, ReadonlyConfig options) { + return new MilvusCatalog(catalogName, options); + } + + @Override + public String factoryIdentifier() { + return "Milvus"; + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder().build(); + } +} diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusOptions.java similarity index 70% copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java copy to seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusOptions.java index 838a384809..b589b21d3d 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusOptions.java @@ -14,27 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.seatunnel.connectors.seatunnel.milvus.catalog; -package org.apache.seatunnel.api.table.type; +public class MilvusOptions { -/** The sql type of {@link SeaTunnelDataType}. */ -public enum SqlType { - ARRAY, - MAP, - STRING, - BOOLEAN, - TINYINT, - SMALLINT, - INT, - BIGINT, - FLOAT, - DOUBLE, - DECIMAL, - NULL, - BYTES, - DATE, - TIME, - TIMESTAMP, - ROW, - MULTIPLE_ROW; + public static final String ENABLE_DYNAMIC_FIELD = "enableDynamicField"; } diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkConfig.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkConfig.java new file mode 100644 index 0000000000..d2357e559c --- /dev/null +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkConfig.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.milvus.config; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.api.sink.DataSaveMode; +import org.apache.seatunnel.api.sink.SchemaSaveMode; + +import java.util.Arrays; + +import static org.apache.seatunnel.api.sink.DataSaveMode.APPEND_DATA; +import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA; +import static org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS; + +public class MilvusSinkConfig { + + public static final String CONNECTOR_IDENTITY = "Milvus"; + + public static final Option<String> URL = + Options.key("url") + .stringType() + .noDefaultValue() + .withDescription("Milvus public endpoint"); + + public static final Option<String> TOKEN = + Options.key("token") + .stringType() + .noDefaultValue() + .withDescription("Milvus token for authentication"); + + public static final Option<String> DATABASE = + Options.key("database").stringType().noDefaultValue().withDescription("database"); + + public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE = + Options.key("schema_save_mode") + .enumType(SchemaSaveMode.class) + .defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST) + .withDescription("schema_save_mode"); + + public static final Option<DataSaveMode> DATA_SAVE_MODE = + Options.key("data_save_mode") + .singleChoice( + DataSaveMode.class, + Arrays.asList(DROP_DATA, APPEND_DATA, ERROR_WHEN_DATA_EXISTS)) + .defaultValue(APPEND_DATA) + .withDescription("data_save_mode"); + + public static final Option<Boolean> ENABLE_AUTO_ID = + Options.key("enable_auto_id") + .booleanType() + .defaultValue(false) + .withDescription("Enable Auto Id"); + + public static final Option<Boolean> ENABLE_UPSERT = + Options.key("enable_upsert") + .booleanType() + .defaultValue(true) + .withDescription("Enable upsert mode"); + + public static final Option<Boolean> ENABLE_DYNAMIC_FIELD = + Options.key("enable_dynamic_field") + .booleanType() + .defaultValue(true) + .withDescription("Enable dynamic field"); + + public static final Option<Integer> BATCH_SIZE = + Options.key("batch_size") + .intType() + .defaultValue(1000) + .withDescription("writer batch size"); +} diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSourceConfig.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSourceConfig.java new file mode 100644 index 0000000000..aa92286ac0 --- /dev/null +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSourceConfig.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.milvus.config; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; + +public class MilvusSourceConfig { + + public static final Option<String> URL = + Options.key("url") + .stringType() + .noDefaultValue() + .withDescription("Milvus public endpoint"); + + public static final Option<String> TOKEN = + Options.key("token") + .stringType() + .noDefaultValue() + .withDescription("Milvus token for authentication"); + + public static final Option<String> DATABASE = + Options.key("database") + .stringType() + .defaultValue("default") + .withDescription("database"); + + public static final Option<String> COLLECTION = + Options.key("collection") + .stringType() + .noDefaultValue() + .withDescription("Milvus collection to read"); +} diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/convert/MilvusConvertUtils.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/convert/MilvusConvertUtils.java new file mode 100644 index 0000000000..6b2661680b --- /dev/null +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/convert/MilvusConvertUtils.java @@ -0,0 +1,397 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.milvus.convert; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.ConstraintKey; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.catalog.VectorIndex; +import org.apache.seatunnel.api.table.catalog.exception.CatalogException; +import org.apache.seatunnel.api.table.type.ArrayType; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SqlType; +import org.apache.seatunnel.api.table.type.VectorType; +import org.apache.seatunnel.common.utils.JsonUtils; +import org.apache.seatunnel.connectors.seatunnel.milvus.catalog.MilvusOptions; +import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectionErrorCode; +import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException; + +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.util.Lists; + +import com.google.protobuf.ProtocolStringList; +import io.milvus.client.MilvusServiceClient; +import io.milvus.common.utils.JacksonUtils; +import io.milvus.grpc.CollectionSchema; +import io.milvus.grpc.DataType; +import io.milvus.grpc.DescribeCollectionResponse; +import io.milvus.grpc.DescribeIndexResponse; +import io.milvus.grpc.FieldSchema; +import io.milvus.grpc.IndexDescription; +import io.milvus.grpc.KeyValuePair; +import io.milvus.grpc.ShowCollectionsResponse; +import io.milvus.grpc.ShowType; +import io.milvus.param.ConnectParam; +import io.milvus.param.R; +import io.milvus.param.collection.DescribeCollectionParam; +import io.milvus.param.collection.ShowCollectionsParam; +import io.milvus.param.index.DescribeIndexParam; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class MilvusConvertUtils { + + private static final String CATALOG_NAME = "Milvus"; + + public static Map<TablePath, CatalogTable> getSourceTables(ReadonlyConfig config) { + MilvusServiceClient client = + new MilvusServiceClient( + ConnectParam.newBuilder() + .withUri(config.get(MilvusSourceConfig.URL)) + .withToken(config.get(MilvusSourceConfig.TOKEN)) + .build()); + + String database = config.get(MilvusSourceConfig.DATABASE); + List<String> collectionList = new ArrayList<>(); + if (StringUtils.isNotEmpty(config.get(MilvusSourceConfig.COLLECTION))) { + collectionList.add(config.get(MilvusSourceConfig.COLLECTION)); + } else { + R<ShowCollectionsResponse> response = + client.showCollections( + ShowCollectionsParam.newBuilder() + .withDatabaseName(database) + .withShowType(ShowType.All) + .build()); + if (response.getStatus() != R.Status.Success.getCode()) { + throw new MilvusConnectorException( + MilvusConnectionErrorCode.SHOW_COLLECTIONS_ERROR); + } + + ProtocolStringList collections = response.getData().getCollectionNamesList(); + if (CollectionUtils.isEmpty(collections)) { + throw new MilvusConnectorException( + MilvusConnectionErrorCode.DATABASE_NO_COLLECTIONS, database); + } + collectionList.addAll(collections); + } + + Map<TablePath, CatalogTable> map = new HashMap<>(); + for (String collection : collectionList) { + CatalogTable catalogTable = getCatalogTable(client, database, collection); + map.put(TablePath.of(database, collection), catalogTable); + } + return map; + } + + public static CatalogTable getCatalogTable( + MilvusServiceClient client, String database, String collection) { + R<DescribeCollectionResponse> response = + client.describeCollection( + DescribeCollectionParam.newBuilder() + .withDatabaseName(database) + .withCollectionName(collection) + .build()); + + if (response.getStatus() != R.Status.Success.getCode()) { + throw new MilvusConnectorException(MilvusConnectionErrorCode.DESC_COLLECTION_ERROR); + } + + // collection column + DescribeCollectionResponse data = response.getData(); + CollectionSchema schema = data.getSchema(); + List<Column> columns = new ArrayList<>(); + for (FieldSchema fieldSchema : schema.getFieldsList()) { + columns.add(MilvusConvertUtils.convertColumn(fieldSchema)); + } + + // primary key + PrimaryKey primaryKey = buildPrimaryKey(schema.getFieldsList()); + + // index + R<DescribeIndexResponse> describeIndexResponseR = + client.describeIndex( + DescribeIndexParam.newBuilder() + .withDatabaseName(database) + .withCollectionName(collection) + .build()); + if (describeIndexResponseR.getStatus() != R.Status.Success.getCode()) { + throw new MilvusConnectorException(MilvusConnectionErrorCode.DESC_INDEX_ERROR); + } + DescribeIndexResponse indexResponse = describeIndexResponseR.getData(); + List<ConstraintKey.ConstraintKeyColumn> vectorIndexes = buildVectorIndexes(indexResponse); + + // build tableSchema + TableSchema tableSchema = + TableSchema.builder() + .columns(columns) + .primaryKey(primaryKey) + .constraintKey( + ConstraintKey.of( + ConstraintKey.ConstraintType.VECTOR_INDEX_KEY, + "vector_index", + vectorIndexes)) + .build(); + + // build tableId + TableIdentifier tableId = TableIdentifier.of(CATALOG_NAME, database, collection); + + // build options info + Map<String, String> options = new HashMap<>(); + options.put( + MilvusOptions.ENABLE_DYNAMIC_FIELD, String.valueOf(schema.getEnableDynamicField())); + + return CatalogTable.of( + tableId, tableSchema, options, new ArrayList<>(), schema.getDescription()); + } + + private static List<ConstraintKey.ConstraintKeyColumn> buildVectorIndexes( + DescribeIndexResponse indexResponse) { + if (CollectionUtils.isEmpty(indexResponse.getIndexDescriptionsList())) { + return null; + } + + List<ConstraintKey.ConstraintKeyColumn> list = new ArrayList<>(); + for (IndexDescription per : indexResponse.getIndexDescriptionsList()) { + Map<String, String> paramsMap = + per.getParamsList().stream() + .collect( + Collectors.toMap(KeyValuePair::getKey, KeyValuePair::getValue)); + + VectorIndex index = + new VectorIndex( + per.getIndexName(), + per.getFieldName(), + paramsMap.get("index_type"), + paramsMap.get("metric_type")); + + list.add(index); + } + + return list; + } + + public static PrimaryKey buildPrimaryKey(List<FieldSchema> fields) { + for (FieldSchema field : fields) { + if (field.getIsPrimaryKey()) { + return PrimaryKey.of( + field.getName(), Lists.newArrayList(field.getName()), field.getAutoID()); + } + } + + return null; + } + + public static PhysicalColumn convertColumn(FieldSchema fieldSchema) { + DataType dataType = fieldSchema.getDataType(); + PhysicalColumn.PhysicalColumnBuilder builder = PhysicalColumn.builder(); + builder.name(fieldSchema.getName()); + builder.sourceType(dataType.name()); + builder.comment(fieldSchema.getDescription()); + + switch (dataType) { + case Bool: + builder.dataType(BasicType.BOOLEAN_TYPE); + break; + case Int8: + builder.dataType(BasicType.BYTE_TYPE); + break; + case Int16: + builder.dataType(BasicType.SHORT_TYPE); + break; + case Int32: + builder.dataType(BasicType.INT_TYPE); + break; + case Int64: + builder.dataType(BasicType.LONG_TYPE); + break; + case Float: + builder.dataType(BasicType.FLOAT_TYPE); + break; + case Double: + builder.dataType(BasicType.DOUBLE_TYPE); + break; + case VarChar: + builder.dataType(BasicType.STRING_TYPE); + for (KeyValuePair keyValuePair : fieldSchema.getTypeParamsList()) { + if (keyValuePair.getKey().equals("max_length")) { + builder.columnLength(Long.parseLong(keyValuePair.getValue()) * 4); + break; + } + } + break; + case String: + case JSON: + builder.dataType(BasicType.STRING_TYPE); + break; + case Array: + builder.dataType(ArrayType.STRING_ARRAY_TYPE); + break; + case FloatVector: + builder.dataType(VectorType.VECTOR_FLOAT_TYPE); + for (KeyValuePair keyValuePair : fieldSchema.getTypeParamsList()) { + if (keyValuePair.getKey().equals("dim")) { + builder.scale(Integer.valueOf(keyValuePair.getValue())); + break; + } + } + break; + case BinaryVector: + builder.dataType(VectorType.VECTOR_BINARY_TYPE); + for (KeyValuePair keyValuePair : fieldSchema.getTypeParamsList()) { + if (keyValuePair.getKey().equals("dim")) { + builder.scale(Integer.valueOf(keyValuePair.getValue())); + break; + } + } + break; + case SparseFloatVector: + builder.dataType(VectorType.VECTOR_SPARSE_FLOAT_TYPE); + break; + case Float16Vector: + builder.dataType(VectorType.VECTOR_FLOAT16_TYPE); + for (KeyValuePair keyValuePair : fieldSchema.getTypeParamsList()) { + if (keyValuePair.getKey().equals("dim")) { + builder.scale(Integer.valueOf(keyValuePair.getValue())); + break; + } + } + break; + case BFloat16Vector: + builder.dataType(VectorType.VECTOR_BFLOAT16_TYPE); + for (KeyValuePair keyValuePair : fieldSchema.getTypeParamsList()) { + if (keyValuePair.getKey().equals("dim")) { + builder.scale(Integer.valueOf(keyValuePair.getValue())); + break; + } + } + break; + default: + throw new UnsupportedOperationException("Unsupported data type: " + dataType); + } + + return builder.build(); + } + + public static Object convertBySeaTunnelType(SeaTunnelDataType<?> fieldType, Object value) { + SqlType sqlType = fieldType.getSqlType(); + switch (sqlType) { + case INT: + return Integer.parseInt(value.toString()); + case BIGINT: + return Long.parseLong(value.toString()); + case SMALLINT: + return Short.parseShort(value.toString()); + case STRING: + case DATE: + return value.toString(); + case FLOAT_VECTOR: + List<Float> vector = new ArrayList<>(); + for (Object o : (Object[]) value) { + vector.add(Float.parseFloat(o.toString())); + } + return vector; + case FLOAT: + return Float.parseFloat(value.toString()); + case BOOLEAN: + return Boolean.parseBoolean(value.toString()); + case DOUBLE: + return Double.parseDouble(value.toString()); + case ARRAY: + ArrayType<?, ?> arrayType = (ArrayType<?, ?>) fieldType; + switch (arrayType.getElementType().getSqlType()) { + case STRING: + String[] stringArray = (String[]) value; + return Arrays.asList(stringArray); + case INT: + Integer[] intArray = (Integer[]) value; + return Arrays.asList(intArray); + case BIGINT: + Long[] longArray = (Long[]) value; + return Arrays.asList(longArray); + case FLOAT: + Float[] floatArray = (Float[]) value; + return Arrays.asList(floatArray); + case DOUBLE: + Double[] doubleArray = (Double[]) value; + return Arrays.asList(doubleArray); + } + case ROW: + SeaTunnelRow row = (SeaTunnelRow) value; + return JsonUtils.toJsonString(row.getFields()); + case MAP: + return JacksonUtils.toJsonString(value); + default: + throw new MilvusConnectorException( + MilvusConnectionErrorCode.NOT_SUPPORT_TYPE, sqlType.name()); + } + } + + public static DataType convertSqlTypeToDataType(SqlType sqlType) { + switch (sqlType) { + case BOOLEAN: + return DataType.Bool; + case TINYINT: + return DataType.Int8; + case SMALLINT: + return DataType.Int16; + case INT: + return DataType.Int32; + case BIGINT: + return DataType.Int64; + case FLOAT: + return DataType.Float; + case DOUBLE: + return DataType.Double; + case STRING: + return DataType.VarChar; + case ARRAY: + return DataType.Array; + case FLOAT_VECTOR: + return DataType.FloatVector; + case BINARY_VECTOR: + return DataType.BinaryVector; + case FLOAT16_VECTOR: + return DataType.Float16Vector; + case BFLOAT16_VECTOR: + return DataType.BFloat16Vector; + case SPARSE_FLOAT_VECTOR: + return DataType.SparseFloatVector; + case DATE: + return DataType.VarChar; + case ROW: + return DataType.VarChar; + } + throw new CatalogException( + String.format("Not support convert to milvus type, sqlType is %s", sqlType)); + } +} diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectionErrorCode.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectionErrorCode.java new file mode 100644 index 0000000000..3acc3de804 --- /dev/null +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectionErrorCode.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.milvus.exception; + +import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; + +public enum MilvusConnectionErrorCode implements SeaTunnelErrorCode { + SERVER_RESPONSE_FAILED("MILVUS-01", "Milvus server response error"), + COLLECTION_NOT_FOUND("MILVUS-02", "Collection not found"), + FIELD_NOT_FOUND("MILVUS-03", "Field not found"), + DESC_COLLECTION_ERROR("MILVUS-04", "Desc collection error"), + SHOW_COLLECTIONS_ERROR("MILVUS-05", "Show collections error"), + COLLECTION_NOT_LOADED("MILVUS-06", "Collection not loaded"), + NOT_SUPPORT_TYPE("MILVUS-07", "Type not support yet"), + DATABASE_NO_COLLECTIONS("MILVUS-08", "Database no any collections"), + SOURCE_TABLE_SCHEMA_IS_NULL("MILVUS-09", "Source table schema is null"), + FIELD_IS_NULL("MILVUS-10", "Field is null"), + CLOSE_CLIENT_ERROR("MILVUS-11", "Close client error"), + DESC_INDEX_ERROR("MILVUS-12", "Desc index error"), + CREATE_DATABASE_ERROR("MILVUS-13", "Create database error"), + CREATE_COLLECTION_ERROR("MILVUS-14", "Create collection error"), + CREATE_INDEX_ERROR("MILVUS-15", "Create index error"), + ; + + private final String code; + private final String description; + + MilvusConnectionErrorCode(String code, String description) { + this.code = code; + this.description = description; + } + + @Override + public String getCode() { + return code; + } + + @Override + public String getDescription() { + return description; + } +} diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectorException.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectorException.java new file mode 100644 index 0000000000..df6ea7adca --- /dev/null +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectorException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.milvus.exception; + +import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; + +public class MilvusConnectorException extends SeaTunnelRuntimeException { + public MilvusConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) { + super(seaTunnelErrorCode, errorMessage); + } + + public MilvusConnectorException(SeaTunnelErrorCode seaTunnelErrorCode) { + super(seaTunnelErrorCode, seaTunnelErrorCode.getErrorMessage()); + } + + public MilvusConnectorException( + SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) { + super(seaTunnelErrorCode, errorMessage, cause); + } + + public MilvusConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) { + super(seaTunnelErrorCode, cause); + } +} diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java new file mode 100644 index 0000000000..c5b1b82bcc --- /dev/null +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.milvus.sink; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.serialization.DefaultSerializer; +import org.apache.seatunnel.api.serialization.Serializer; +import org.apache.seatunnel.api.sink.DataSaveMode; +import org.apache.seatunnel.api.sink.DefaultSaveModeHandler; +import org.apache.seatunnel.api.sink.SaveModeHandler; +import org.apache.seatunnel.api.sink.SchemaSaveMode; +import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.sink.SinkCommitter; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.sink.SupportSaveMode; +import org.apache.seatunnel.api.table.catalog.Catalog; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.factory.CatalogFactory; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.milvus.catalog.MilvusCatalogFactory; +import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig; +import org.apache.seatunnel.connectors.seatunnel.milvus.state.MilvusAggregatedCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.milvus.state.MilvusCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.milvus.state.MilvusSinkState; + +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +public class MilvusSink + implements SeaTunnelSink< + SeaTunnelRow, + MilvusSinkState, + MilvusCommitInfo, + MilvusAggregatedCommitInfo>, + SupportSaveMode { + + private final ReadonlyConfig config; + private final CatalogTable catalogTable; + + public MilvusSink(ReadonlyConfig config, CatalogTable catalogTable) { + this.config = config; + this.catalogTable = catalogTable; + } + + @Override + public SinkWriter<SeaTunnelRow, MilvusCommitInfo, MilvusSinkState> createWriter( + SinkWriter.Context context) { + + return new MilvusSinkWriter(context, catalogTable, config, Collections.emptyList()); + } + + @Override + public SinkWriter<SeaTunnelRow, MilvusCommitInfo, MilvusSinkState> restoreWriter( + SinkWriter.Context context, List<MilvusSinkState> states) { + return new MilvusSinkWriter(context, catalogTable, config, states); + } + + @Override + public Optional<Serializer<MilvusSinkState>> getWriterStateSerializer() { + return Optional.of(new DefaultSerializer<>()); + } + + @Override + public Optional<SinkCommitter<MilvusCommitInfo>> createCommitter() { + return Optional.of(new MilvusSinkCommitter(config)); + } + + @Override + public Optional<Serializer<MilvusCommitInfo>> getCommitInfoSerializer() { + return Optional.of(new DefaultSerializer<>()); + } + + @Override + public String getPluginName() { + return MilvusSinkConfig.CONNECTOR_IDENTITY; + } + + @Override + public Optional<SaveModeHandler> getSaveModeHandler() { + if (catalogTable == null) { + return Optional.empty(); + } + + CatalogFactory catalogFactory = new MilvusCatalogFactory(); + Catalog catalog = catalogFactory.createCatalog(catalogTable.getCatalogName(), config); + + SchemaSaveMode schemaSaveMode = config.get(MilvusSinkConfig.SCHEMA_SAVE_MODE); + DataSaveMode dataSaveMode = config.get(MilvusSinkConfig.DATA_SAVE_MODE); + + catalog.open(); + return Optional.of( + new DefaultSaveModeHandler( + schemaSaveMode, + dataSaveMode, + catalog, + catalogTable.getTablePath(), + catalogTable, + null)); + } +} diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkCommitter.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkCommitter.java new file mode 100644 index 0000000000..8c23bc62e6 --- /dev/null +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkCommitter.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.milvus.sink; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.sink.SinkCommitter; +import org.apache.seatunnel.connectors.seatunnel.milvus.state.MilvusCommitInfo; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +@Slf4j +public class MilvusSinkCommitter implements SinkCommitter<MilvusCommitInfo> { + + public MilvusSinkCommitter(ReadonlyConfig pluginConfig) {} + + /** + * Commit message to third party data receiver, The method need to achieve idempotency. + * + * @param commitInfos The list of commit message + * @return The commit message need retry. + * @throws IOException throw IOException when commit failed. + */ + @Override + public List<MilvusCommitInfo> commit(List<MilvusCommitInfo> commitInfos) throws IOException { + return Collections.emptyList(); + } + + /** + * Abort the transaction, this method will be called (**Only** on Spark engine) when the commit + * is failed. + * + * @param commitInfos The list of commit message, used to abort the commit. + * @throws IOException throw IOException when close failed. + */ + @Override + public void abort(List<MilvusCommitInfo> commitInfos) throws IOException {} +} diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java new file mode 100644 index 0000000000..6ea5b5a2ff --- /dev/null +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.milvus.sink; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.connector.TableSink; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; +import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig; + +import org.apache.commons.lang3.StringUtils; + +import com.google.auto.service.AutoService; + +@AutoService(Factory.class) +public class MilvusSinkFactory implements TableSinkFactory { + + @Override + public String factoryIdentifier() { + return "Milvus"; + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder() + .required(MilvusSinkConfig.URL, MilvusSinkConfig.TOKEN) + .optional( + MilvusSinkConfig.ENABLE_UPSERT, + MilvusSinkConfig.ENABLE_DYNAMIC_FIELD, + MilvusSinkConfig.ENABLE_AUTO_ID, + MilvusSinkConfig.SCHEMA_SAVE_MODE, + MilvusSinkConfig.DATA_SAVE_MODE) + .build(); + } + + public TableSink createSink(TableSinkFactoryContext context) { + ReadonlyConfig config = context.getOptions(); + CatalogTable catalogTable = renameCatalogTable(config, context.getCatalogTable()); + return () -> new MilvusSink(config, catalogTable); + } + + private CatalogTable renameCatalogTable( + ReadonlyConfig config, CatalogTable sourceCatalogTable) { + TableIdentifier sourceTableId = sourceCatalogTable.getTableId(); + String databaseName; + if (StringUtils.isNotEmpty(config.get(MilvusSinkConfig.DATABASE))) { + databaseName = config.get(MilvusSinkConfig.DATABASE); + } else { + databaseName = sourceTableId.getDatabaseName(); + } + + TableIdentifier newTableId = + TableIdentifier.of( + sourceTableId.getCatalogName(), + databaseName, + sourceTableId.getSchemaName(), + sourceTableId.getTableName()); + + return CatalogTable.of(newTableId, sourceCatalogTable); + } +} diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java new file mode 100644 index 0000000000..7c823838c5 --- /dev/null +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.milvus.sink; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.sink.SinkCommitter; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig; +import org.apache.seatunnel.connectors.seatunnel.milvus.sink.batch.MilvusBatchWriter; +import org.apache.seatunnel.connectors.seatunnel.milvus.sink.batch.MilvusBufferBatchWriter; +import org.apache.seatunnel.connectors.seatunnel.milvus.state.MilvusCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.milvus.state.MilvusSinkState; + +import io.milvus.v2.client.ConnectConfig; +import io.milvus.v2.client.MilvusClientV2; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; + +import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig.BATCH_SIZE; + +@Slf4j +/** MilvusSinkWriter is a sink writer that will write {@link SeaTunnelRow} to Milvus. */ +public class MilvusSinkWriter + implements SinkWriter<SeaTunnelRow, MilvusCommitInfo, MilvusSinkState> { + private final Context context; + + private final ReadonlyConfig config; + private MilvusBatchWriter batchWriter; + + public MilvusSinkWriter( + Context context, + CatalogTable catalogTable, + ReadonlyConfig config, + List<MilvusSinkState> milvusSinkStates) { + this.context = context; + this.config = config; + ConnectConfig connectConfig = + ConnectConfig.builder() + .uri(config.get(MilvusSinkConfig.URL)) + .token(config.get(MilvusSinkConfig.TOKEN)) + .build(); + this.batchWriter = + new MilvusBufferBatchWriter( + catalogTable, + config.get(BATCH_SIZE), + getAutoId(catalogTable.getTableSchema().getPrimaryKey()), + config.get(MilvusSinkConfig.ENABLE_UPSERT), + new MilvusClientV2(connectConfig)); + } + + /** + * write data to third party data receiver. + * + * @param element the data need be written. + * @throws IOException throw IOException when write data failed. + */ + @Override + public void write(SeaTunnelRow element) { + batchWriter.addToBatch(element); + if (batchWriter.needFlush()) { + batchWriter.flush(); + } + } + + private Boolean getAutoId(PrimaryKey primaryKey) { + if (null != primaryKey && null != primaryKey.getEnableAutoId()) { + return primaryKey.getEnableAutoId(); + } else { + return config.get(MilvusSinkConfig.ENABLE_AUTO_ID); + } + } + + /** + * prepare the commit, will be called before {@link #snapshotState(long checkpointId)}. If you + * need to use 2pc, you can return the commit info in this method, and receive the commit info + * in {@link SinkCommitter#commit(List)}. If this method failed (by throw exception), **Only** + * Spark engine will call {@link #abortPrepare()} + * + * @return the commit info need to commit + */ + @Override + public Optional<MilvusCommitInfo> prepareCommit() throws IOException { + batchWriter.flush(); + return Optional.empty(); + } + + /** + * Used to abort the {@link #prepareCommit()}, if the prepareCommit failed, there is no + * CommitInfoT, so the rollback work cannot be done by {@link SinkCommitter}. But we can use + * this method to rollback side effects of {@link #prepareCommit()}. Only use it in Spark engine + * at now. + */ + @Override + public void abortPrepare() {} + + /** + * call it when SinkWriter close + * + * @throws IOException if close failed + */ + @Override + public void close() throws IOException { + if (batchWriter != null) { + batchWriter.flush(); + batchWriter.close(); + } + } +} diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/batch/MilvusBatchWriter.java similarity index 70% copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java copy to seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/batch/MilvusBatchWriter.java index 838a384809..91e04342dc 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/batch/MilvusBatchWriter.java @@ -15,26 +15,17 @@ * limitations under the License. */ -package org.apache.seatunnel.api.table.type; +package org.apache.seatunnel.connectors.seatunnel.milvus.sink.batch; -/** The sql type of {@link SeaTunnelDataType}. */ -public enum SqlType { - ARRAY, - MAP, - STRING, - BOOLEAN, - TINYINT, - SMALLINT, - INT, - BIGINT, - FLOAT, - DOUBLE, - DECIMAL, - NULL, - BYTES, - DATE, - TIME, - TIMESTAMP, - ROW, - MULTIPLE_ROW; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; + +public interface MilvusBatchWriter { + + void addToBatch(SeaTunnelRow element); + + boolean needFlush(); + + boolean flush(); + + void close(); } diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/batch/MilvusBufferBatchWriter.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/batch/MilvusBufferBatchWriter.java new file mode 100644 index 0000000000..a323095bc2 --- /dev/null +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/batch/MilvusBufferBatchWriter.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.milvus.sink.batch; + +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.utils.SeaTunnelException; +import org.apache.seatunnel.connectors.seatunnel.milvus.convert.MilvusConvertUtils; +import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectionErrorCode; +import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException; + +import org.apache.commons.collections4.CollectionUtils; + +import com.alibaba.fastjson.JSONObject; +import io.milvus.v2.client.MilvusClientV2; +import io.milvus.v2.service.vector.request.InsertReq; +import io.milvus.v2.service.vector.request.UpsertReq; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.seatunnel.api.table.catalog.PrimaryKey.isPrimaryKeyField; + +public class MilvusBufferBatchWriter implements MilvusBatchWriter { + + private final int batchSize; + private final CatalogTable catalogTable; + private final Boolean autoId; + private final Boolean enableUpsert; + private final String collectionName; + private MilvusClientV2 milvusClient; + + private volatile List<JSONObject> milvusDataCache; + private volatile int writeCount = 0; + + public MilvusBufferBatchWriter( + CatalogTable catalogTable, + Integer batchSize, + Boolean autoId, + Boolean enableUpsert, + MilvusClientV2 milvusClient) { + this.catalogTable = catalogTable; + this.autoId = autoId; + this.enableUpsert = enableUpsert; + this.milvusClient = milvusClient; + this.collectionName = catalogTable.getTablePath().getTableName(); + this.batchSize = batchSize; + this.milvusDataCache = new ArrayList<>(batchSize); + } + + @Override + public void addToBatch(SeaTunnelRow element) { + JSONObject data = buildMilvusData(element); + milvusDataCache.add(data); + writeCount++; + } + + @Override + public boolean needFlush() { + return this.writeCount >= this.batchSize; + } + + @Override + public synchronized boolean flush() { + if (CollectionUtils.isEmpty(this.milvusDataCache)) { + return true; + } + writeData2Collection(); + this.milvusDataCache = new ArrayList<>(this.batchSize); + this.writeCount = 0; + return true; + } + + @Override + public void close() { + try { + this.milvusClient.close(10); + } catch (InterruptedException e) { + throw new SeaTunnelException(e); + } + } + + private JSONObject buildMilvusData(SeaTunnelRow element) { + SeaTunnelRowType seaTunnelRowType = catalogTable.getSeaTunnelRowType(); + PrimaryKey primaryKey = catalogTable.getTableSchema().getPrimaryKey(); + + JSONObject data = new JSONObject(); + for (int i = 0; i < seaTunnelRowType.getFieldNames().length; i++) { + String fieldName = seaTunnelRowType.getFieldNames()[i]; + + if (autoId && isPrimaryKeyField(primaryKey, fieldName)) { + continue; // if create table open AutoId, then don't need insert data with + // primaryKey field. + } + + SeaTunnelDataType<?> fieldType = seaTunnelRowType.getFieldType(i); + Object value = element.getField(i); + if (null == value) { + throw new MilvusConnectorException( + MilvusConnectionErrorCode.FIELD_IS_NULL, fieldName); + } + data.put(fieldName, MilvusConvertUtils.convertBySeaTunnelType(fieldType, value)); + } + return data; + } + + private void writeData2Collection() { + // default to use upsertReq, but upsert only works when autoID is disabled + if (enableUpsert && !autoId) { + UpsertReq upsertReq = + UpsertReq.builder() + .collectionName(this.collectionName) + .data(this.milvusDataCache) + .build(); + milvusClient.upsert(upsertReq); + } else { + InsertReq insertReq = + InsertReq.builder() + .collectionName(this.collectionName) + .data(this.milvusDataCache) + .build(); + milvusClient.insert(insertReq); + } + } +} diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSource.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSource.java new file mode 100644 index 0000000000..05e9aed769 --- /dev/null +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSource.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.milvus.source; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.api.source.SupportColumnProjection; +import org.apache.seatunnel.api.source.SupportParallelism; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.milvus.convert.MilvusConvertUtils; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class MilvusSource + implements SeaTunnelSource<SeaTunnelRow, MilvusSourceSplit, MilvusSourceState>, + SupportParallelism, + SupportColumnProjection { + + private final ReadonlyConfig config; + private final Map<TablePath, CatalogTable> sourceTables; + + public MilvusSource(ReadonlyConfig sourceConfig) { + this.config = sourceConfig; + this.sourceTables = MilvusConvertUtils.getSourceTables(config); + } + + @Override + public Boundedness getBoundedness() { + return Boundedness.BOUNDED; + } + + public List<CatalogTable> getProducedCatalogTables() { + return new ArrayList<>(sourceTables.values()); + } + + @Override + public SourceReader<SeaTunnelRow, MilvusSourceSplit> createReader( + SourceReader.Context readerContext) throws Exception { + return new MilvusSourceReader(readerContext, config, sourceTables); + } + + @Override + public SourceSplitEnumerator<MilvusSourceSplit, MilvusSourceState> createEnumerator( + SourceSplitEnumerator.Context<MilvusSourceSplit> context) throws Exception { + return new MilvusSourceSplitEnumertor(context, config, sourceTables, null); + } + + @Override + public SourceSplitEnumerator<MilvusSourceSplit, MilvusSourceState> restoreEnumerator( + SourceSplitEnumerator.Context<MilvusSourceSplit> context, + MilvusSourceState checkpointState) + throws Exception { + return new MilvusSourceSplitEnumertor(context, config, sourceTables, checkpointState); + } + + @Override + public String getPluginName() { + return "Milvus"; + } +} diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceFactory.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceFactory.java new file mode 100644 index 0000000000..d511026a85 --- /dev/null +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceFactory.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.milvus.source; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.table.connector.TableSource; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSourceFactory; +import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; +import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSourceConfig; + +import com.google.auto.service.AutoService; +import lombok.extern.slf4j.Slf4j; + +import java.io.Serializable; + +@Slf4j +@AutoService(Factory.class) +public class MilvusSourceFactory implements TableSourceFactory { + + @Override + public <T, SplitT extends SourceSplit, StateT extends Serializable> + TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) { + return () -> (SeaTunnelSource<T, SplitT, StateT>) new MilvusSource(context.getOptions()); + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder() + .required(MilvusSourceConfig.URL, MilvusSourceConfig.TOKEN) + .optional(MilvusSourceConfig.DATABASE, MilvusSourceConfig.COLLECTION) + .build(); + } + + @Override + public Class<? extends SeaTunnelSource> getSourceClass() { + return MilvusSource.class; + } + + @Override + public String factoryIdentifier() { + return "Milvus"; + } +} diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceReader.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceReader.java new file mode 100644 index 0000000000..e52f264264 --- /dev/null +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceReader.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.milvus.source; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.type.RowKind; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectionErrorCode; +import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException; + +import org.apache.curator.shaded.com.google.common.collect.Lists; + +import io.milvus.client.MilvusServiceClient; +import io.milvus.grpc.GetLoadStateResponse; +import io.milvus.grpc.LoadState; +import io.milvus.orm.iterator.QueryIterator; +import io.milvus.param.ConnectParam; +import io.milvus.param.R; +import io.milvus.param.collection.GetLoadStateParam; +import io.milvus.param.dml.QueryIteratorParam; +import io.milvus.response.QueryResultsWrapper; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Deque; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentLinkedDeque; + +@Slf4j +public class MilvusSourceReader implements SourceReader<SeaTunnelRow, MilvusSourceSplit> { + + private final Deque<MilvusSourceSplit> pendingSplits = new ConcurrentLinkedDeque<>(); + private final ReadonlyConfig config; + private final Context context; + private Map<TablePath, CatalogTable> sourceTables; + + private MilvusServiceClient client; + + private volatile boolean noMoreSplit; + + public MilvusSourceReader( + Context readerContext, + ReadonlyConfig config, + Map<TablePath, CatalogTable> sourceTables) { + this.context = readerContext; + this.config = config; + this.sourceTables = sourceTables; + } + + @Override + public void open() throws Exception { + client = + new MilvusServiceClient( + ConnectParam.newBuilder() + .withUri(config.get(MilvusSourceConfig.URL)) + .withToken(config.get(MilvusSourceConfig.TOKEN)) + .build()); + } + + @Override + public void close() throws IOException { + client.close(); + } + + @Override + public void pollNext(Collector<SeaTunnelRow> output) throws Exception { + synchronized (output.getCheckpointLock()) { + MilvusSourceSplit split = pendingSplits.poll(); + if (null != split) { + handleEveryRowInternal(split, output); + } else { + if (!noMoreSplit) { + log.info("Milvus source wait split!"); + } + } + } + if (noMoreSplit + && pendingSplits.isEmpty() + && Boundedness.BOUNDED.equals(context.getBoundedness())) { + // signal to the source that we have reached the end of the data. + log.info("Closed the bounded milvus source"); + context.signalNoMoreElement(); + } + Thread.sleep(1000L); + } + + private void handleEveryRowInternal(MilvusSourceSplit split, Collector<SeaTunnelRow> output) { + TablePath tablePath = split.getTablePath(); + TableSchema tableSchema = sourceTables.get(tablePath).getTableSchema(); + if (null == tableSchema) { + throw new MilvusConnectorException( + MilvusConnectionErrorCode.SOURCE_TABLE_SCHEMA_IS_NULL); + } + + R<GetLoadStateResponse> loadStateResponse = + client.getLoadState( + GetLoadStateParam.newBuilder() + .withDatabaseName(tablePath.getDatabaseName()) + .withCollectionName(tablePath.getTableName()) + .build()); + if (loadStateResponse.getStatus() != R.Status.Success.getCode()) { + throw new MilvusConnectorException( + MilvusConnectionErrorCode.SERVER_RESPONSE_FAILED, + loadStateResponse.getException()); + } + + if (!LoadState.LoadStateLoaded.equals(loadStateResponse.getData().getState())) { + throw new MilvusConnectorException(MilvusConnectionErrorCode.COLLECTION_NOT_LOADED); + } + + QueryIteratorParam param = + QueryIteratorParam.newBuilder() + .withDatabaseName(tablePath.getDatabaseName()) + .withCollectionName(tablePath.getTableName()) + .withOutFields(Lists.newArrayList("*")) + .build(); + + R<QueryIterator> response = client.queryIterator(param); + if (response.getStatus() != R.Status.Success.getCode()) { + throw new MilvusConnectorException( + MilvusConnectionErrorCode.SERVER_RESPONSE_FAILED, + loadStateResponse.getException()); + } + + QueryIterator iterator = response.getData(); + while (true) { + List<QueryResultsWrapper.RowRecord> next = iterator.next(); + if (next == null || next.isEmpty()) { + break; + } else { + for (QueryResultsWrapper.RowRecord record : next) { + SeaTunnelRow seaTunnelRow = + convertToSeaTunnelRow(record, tableSchema, tablePath); + output.collect(seaTunnelRow); + } + } + } + } + + public SeaTunnelRow convertToSeaTunnelRow( + QueryResultsWrapper.RowRecord record, TableSchema tableSchema, TablePath tablePath) { + SeaTunnelRowType typeInfo = tableSchema.toPhysicalRowDataType(); + Object[] fields = new Object[record.getFieldValues().size()]; + Map<String, Object> fieldValuesMap = record.getFieldValues(); + String[] fieldNames = typeInfo.getFieldNames(); + for (int fieldIndex = 0; fieldIndex < typeInfo.getTotalFields(); fieldIndex++) { + SeaTunnelDataType<?> seaTunnelDataType = typeInfo.getFieldType(fieldIndex); + Object filedValues = fieldValuesMap.get(fieldNames[fieldIndex]); + switch (seaTunnelDataType.getSqlType()) { + case STRING: + fields[fieldIndex] = filedValues.toString(); + break; + case BOOLEAN: + if (filedValues instanceof Boolean) { + fields[fieldIndex] = filedValues; + } else { + fields[fieldIndex] = Boolean.valueOf(filedValues.toString()); + } + break; + case INT: + if (filedValues instanceof Integer) { + fields[fieldIndex] = filedValues; + } else { + fields[fieldIndex] = Integer.valueOf(filedValues.toString()); + } + break; + case BIGINT: + if (filedValues instanceof Long) { + fields[fieldIndex] = filedValues; + } else { + fields[fieldIndex] = Long.parseLong(filedValues.toString()); + } + break; + case FLOAT: + if (filedValues instanceof Float) { + fields[fieldIndex] = filedValues; + } else { + fields[fieldIndex] = Float.parseFloat(filedValues.toString()); + } + break; + case DOUBLE: + if (filedValues instanceof Double) { + fields[fieldIndex] = filedValues; + } else { + fields[fieldIndex] = Double.parseDouble(filedValues.toString()); + } + break; + case FLOAT_VECTOR: + if (filedValues instanceof List) { + List list = (List) filedValues; + Float[] arrays = new Float[list.size()]; + for (int i = 0; i < list.size(); i++) { + arrays[i] = Float.parseFloat(list.get(i).toString()); + } + fields[fieldIndex] = arrays; + break; + } else { + throw new MilvusConnectorException( + CommonErrorCode.UNSUPPORTED_DATA_TYPE, + "Unexpected vector value: " + filedValues); + } + default: + throw new MilvusConnectorException( + CommonErrorCode.UNSUPPORTED_DATA_TYPE, + "Unexpected value: " + seaTunnelDataType.getSqlType().name()); + } + } + + SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields); + seaTunnelRow.setTableId(tablePath.getFullName()); + seaTunnelRow.setRowKind(RowKind.INSERT); + return seaTunnelRow; + } + + @Override + public List<MilvusSourceSplit> snapshotState(long checkpointId) throws Exception { + return new ArrayList<>(pendingSplits); + } + + @Override + public void addSplits(List<MilvusSourceSplit> splits) { + log.info("Adding milvus splits to reader: {}", splits); + pendingSplits.addAll(splits); + } + + @Override + public void handleNoMoreSplits() { + log.info("receive no more splits message, this milvus reader will not add new split."); + noMoreSplit = true; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception {} +} diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplit.java similarity index 65% copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java copy to seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplit.java index 838a384809..e79d74b6dc 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplit.java @@ -15,26 +15,23 @@ * limitations under the License. */ -package org.apache.seatunnel.api.table.type; +package org.apache.seatunnel.connectors.seatunnel.milvus.source; -/** The sql type of {@link SeaTunnelDataType}. */ -public enum SqlType { - ARRAY, - MAP, - STRING, - BOOLEAN, - TINYINT, - SMALLINT, - INT, - BIGINT, - FLOAT, - DOUBLE, - DECIMAL, - NULL, - BYTES, - DATE, - TIME, - TIMESTAMP, - ROW, - MULTIPLE_ROW; +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.table.catalog.TablePath; + +import lombok.Builder; +import lombok.Data; + +@Data +@Builder +public class MilvusSourceSplit implements SourceSplit { + + private TablePath tablePath; + private String splitId; + + @Override + public String splitId() { + return splitId; + } } diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplitEnumertor.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplitEnumertor.java new file mode 100644 index 0000000000..e01e9c8ad5 --- /dev/null +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplitEnumertor.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.milvus.source; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; +import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; + +@Slf4j +public class MilvusSourceSplitEnumertor + implements SourceSplitEnumerator<MilvusSourceSplit, MilvusSourceState> { + + private final Map<TablePath, CatalogTable> tables; + private final Context<MilvusSourceSplit> context; + private final ConcurrentLinkedQueue<TablePath> pendingTables; + private final Map<Integer, List<MilvusSourceSplit>> pendingSplits; + private final Object stateLock = new Object(); + + private ReadonlyConfig config; + + public MilvusSourceSplitEnumertor( + Context<MilvusSourceSplit> context, + ReadonlyConfig config, + Map<TablePath, CatalogTable> sourceTables, + MilvusSourceState sourceState) { + this.context = context; + this.tables = sourceTables; + this.config = config; + if (sourceState == null) { + this.pendingTables = new ConcurrentLinkedQueue<>(tables.keySet()); + this.pendingSplits = new HashMap<>(); + } else { + this.pendingTables = new ConcurrentLinkedQueue<>(sourceState.getPendingTables()); + this.pendingSplits = new HashMap<>(sourceState.getPendingSplits()); + } + } + + @Override + public void open() {} + + @Override + public void run() throws Exception { + log.info("Starting milvus split enumerator."); + Set<Integer> readers = context.registeredReaders(); + while (!pendingTables.isEmpty()) { + synchronized (stateLock) { + TablePath tablePath = pendingTables.poll(); + log.info("begin to split table path: {}", tablePath); + Collection<MilvusSourceSplit> splits = generateSplits(tables.get(tablePath)); + log.info("end to split table {} into {} splits.", tablePath, splits.size()); + + addPendingSplit(splits); + } + + synchronized (stateLock) { + assignSplit(readers); + } + } + + log.info("No more splits to assign." + " Sending NoMoreSplitsEvent to reader {}.", readers); + readers.forEach(context::signalNoMoreSplits); + } + + private Collection<MilvusSourceSplit> generateSplits(CatalogTable table) { + log.info("Start splitting table {} into chunks...", table.getTablePath()); + MilvusSourceSplit milvusSourceSplit = + MilvusSourceSplit.builder() + .splitId(createSplitId(table.getTablePath(), 0)) + .tablePath(table.getTablePath()) + .build(); + + return Collections.singletonList(milvusSourceSplit); + } + + protected String createSplitId(TablePath tablePath, int index) { + return String.format("%s-%s", tablePath, index); + } + + private void addPendingSplit(Collection<MilvusSourceSplit> splits) { + int readerCount = context.currentParallelism(); + for (MilvusSourceSplit split : splits) { + int ownerReader = getSplitOwner(split.splitId(), readerCount); + log.info("Assigning {} to {} reader.", split, ownerReader); + + pendingSplits.computeIfAbsent(ownerReader, r -> new ArrayList<>()).add(split); + } + } + + private static int getSplitOwner(String tp, int numReaders) { + return (tp.hashCode() & Integer.MAX_VALUE) % numReaders; + } + + private void assignSplit(Collection<Integer> readers) { + log.info("Assign pendingSplits to readers {}", readers); + + for (int reader : readers) { + List<MilvusSourceSplit> assignmentForReader = pendingSplits.remove(reader); + if (assignmentForReader != null && !assignmentForReader.isEmpty()) { + log.debug("Assign splits {} to reader {}", assignmentForReader, reader); + context.assignSplit(reader, assignmentForReader); + } + } + } + + @Override + public void close() throws IOException {} + + @Override + public void addSplitsBack(List<MilvusSourceSplit> splits, int subtaskId) { + if (!splits.isEmpty()) { + synchronized (stateLock) { + addPendingSplit(splits, subtaskId); + if (context.registeredReaders().contains(subtaskId)) { + assignSplit(Collections.singletonList(subtaskId)); + } else { + log.warn( + "Reader {} is not registered. Pending splits {} are not assigned.", + subtaskId, + splits); + } + } + } + log.info("Add back splits {} to JdbcSourceSplitEnumerator.", splits.size()); + } + + private void addPendingSplit(Collection<MilvusSourceSplit> splits, int ownerReader) { + pendingSplits.computeIfAbsent(ownerReader, r -> new ArrayList<>()).addAll(splits); + } + + @Override + public int currentUnassignedSplitSize() { + return pendingTables.isEmpty() && pendingSplits.isEmpty() ? 0 : 1; + } + + @Override + public void handleSplitRequest(int subtaskId) { + throw new MilvusConnectorException( + CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION, + String.format("Unsupported handleSplitRequest: %d", subtaskId)); + } + + @Override + public void registerReader(int subtaskId) { + log.info("Register reader {} to MilvusSourceSplitEnumerator.", subtaskId); + if (!pendingSplits.isEmpty()) { + synchronized (stateLock) { + assignSplit(Collections.singletonList(subtaskId)); + } + } + } + + @Override + public MilvusSourceState snapshotState(long checkpointId) throws Exception { + synchronized (stateLock) { + return new MilvusSourceState( + new ArrayList(pendingTables), new HashMap<>(pendingSplits)); + } + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception {} +} diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceState.java similarity index 64% copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java copy to seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceState.java index 838a384809..7b6c2e0672 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceState.java @@ -15,26 +15,20 @@ * limitations under the License. */ -package org.apache.seatunnel.api.table.type; +package org.apache.seatunnel.connectors.seatunnel.milvus.source; -/** The sql type of {@link SeaTunnelDataType}. */ -public enum SqlType { - ARRAY, - MAP, - STRING, - BOOLEAN, - TINYINT, - SMALLINT, - INT, - BIGINT, - FLOAT, - DOUBLE, - DECIMAL, - NULL, - BYTES, - DATE, - TIME, - TIMESTAMP, - ROW, - MULTIPLE_ROW; +import org.apache.seatunnel.api.table.catalog.TablePath; + +import lombok.AllArgsConstructor; +import lombok.Data; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +@Data +@AllArgsConstructor +public class MilvusSourceState implements Serializable { + private List<TablePath> pendingTables; + private Map<Integer, List<MilvusSourceSplit>> pendingSplits; } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/state/MilvusAggregatedCommitInfo.java similarity index 70% copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java copy to seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/state/MilvusAggregatedCommitInfo.java index 838a384809..d4bc422d9b 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/state/MilvusAggregatedCommitInfo.java @@ -15,26 +15,16 @@ * limitations under the License. */ -package org.apache.seatunnel.api.table.type; +package org.apache.seatunnel.connectors.seatunnel.milvus.state; -/** The sql type of {@link SeaTunnelDataType}. */ -public enum SqlType { - ARRAY, - MAP, - STRING, - BOOLEAN, - TINYINT, - SMALLINT, - INT, - BIGINT, - FLOAT, - DOUBLE, - DECIMAL, - NULL, - BYTES, - DATE, - TIME, - TIMESTAMP, - ROW, - MULTIPLE_ROW; +import lombok.AllArgsConstructor; +import lombok.Data; + +import java.io.Serializable; +import java.util.List; + +@Data +@AllArgsConstructor +public class MilvusAggregatedCommitInfo implements Serializable { + List<MilvusCommitInfo> commitInfos; } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/state/MilvusCommitInfo.java similarity index 70% copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java copy to seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/state/MilvusCommitInfo.java index 838a384809..f6887ffa06 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/state/MilvusCommitInfo.java @@ -15,26 +15,13 @@ * limitations under the License. */ -package org.apache.seatunnel.api.table.type; +package org.apache.seatunnel.connectors.seatunnel.milvus.state; -/** The sql type of {@link SeaTunnelDataType}. */ -public enum SqlType { - ARRAY, - MAP, - STRING, - BOOLEAN, - TINYINT, - SMALLINT, - INT, - BIGINT, - FLOAT, - DOUBLE, - DECIMAL, - NULL, - BYTES, - DATE, - TIME, - TIMESTAMP, - ROW, - MULTIPLE_ROW; -} +import lombok.AllArgsConstructor; +import lombok.Data; + +import java.io.Serializable; + +@Data +@AllArgsConstructor +public class MilvusCommitInfo implements Serializable {} diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/state/MilvusSinkState.java similarity index 70% copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java copy to seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/state/MilvusSinkState.java index 838a384809..3d8ff62b1d 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/state/MilvusSinkState.java @@ -15,26 +15,15 @@ * limitations under the License. */ -package org.apache.seatunnel.api.table.type; +package org.apache.seatunnel.connectors.seatunnel.milvus.state; -/** The sql type of {@link SeaTunnelDataType}. */ -public enum SqlType { - ARRAY, - MAP, - STRING, - BOOLEAN, - TINYINT, - SMALLINT, - INT, - BIGINT, - FLOAT, - DOUBLE, - DECIMAL, - NULL, - BYTES, - DATE, - TIME, - TIMESTAMP, - ROW, - MULTIPLE_ROW; -} +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.experimental.SuperBuilder; + +import java.io.Serializable; + +@Data +@SuperBuilder +@AllArgsConstructor +public class MilvusSinkState implements Serializable {} diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml index 0498ff4539..68274736f0 100644 --- a/seatunnel-connectors-v2/pom.xml +++ b/seatunnel-connectors-v2/pom.xml @@ -77,6 +77,7 @@ <module>connector-paimon</module> <module>connector-easysearch</module> <module>connector-web3j</module> + <module>connector-milvus</module> </modules> <dependencyManagement> diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml index 37f1cbebf4..a5dd203f83 100644 --- a/seatunnel-dist/pom.xml +++ b/seatunnel-dist/pom.xml @@ -576,6 +576,13 @@ <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.seatunnel</groupId> + <artifactId>connector-milvus</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <!-- jdbc driver --> <dependency> <groupId>com.aliyun.phoenix</groupId> diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/pom.xml new file mode 100644 index 0000000000..2175811c6c --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/pom.xml @@ -0,0 +1,66 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.seatunnel</groupId> + <artifactId>seatunnel-connector-v2-e2e</artifactId> + <version>${revision}</version> + </parent> + + <artifactId>connector-milvus-e2e</artifactId> + <name>SeaTunnel : E2E : Connector V2 : Milvus</name> + + <properties> + <testcontainer.milvus.version>1.19.8</testcontainer.milvus.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.seatunnel</groupId> + <artifactId>connector-milvus</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + <version>2.8.9</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>milvus</artifactId> + <version>${testcontainer.milvus.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.seatunnel</groupId> + <artifactId>connector-assert</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.seatunnel</groupId> + <artifactId>connector-fake</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + </dependencies> +</project> diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java new file mode 100644 index 0000000000..5356433057 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.v2.milvus; + +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.EngineType; +import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; + +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.milvus.MilvusContainer; + +import com.alibaba.fastjson.JSONObject; +import io.milvus.client.MilvusServiceClient; +import io.milvus.grpc.DataType; +import io.milvus.grpc.DescribeCollectionResponse; +import io.milvus.grpc.FieldSchema; +import io.milvus.grpc.MutationResult; +import io.milvus.param.ConnectParam; +import io.milvus.param.IndexType; +import io.milvus.param.MetricType; +import io.milvus.param.R; +import io.milvus.param.RpcStatus; +import io.milvus.param.collection.CreateCollectionParam; +import io.milvus.param.collection.DescribeCollectionParam; +import io.milvus.param.collection.FieldType; +import io.milvus.param.collection.HasCollectionParam; +import io.milvus.param.collection.LoadCollectionParam; +import io.milvus.param.dml.InsertParam; +import io.milvus.param.index.CreateIndexParam; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +@Slf4j +@DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}, + disabledReason = "Currently SPARK and FLINK not support adapt") +public class MilvusIT extends TestSuiteBase implements TestResource { + + private static final String HOST = "milvus-e2e"; + private static final String MILVUS_IMAGE = "milvusdb/milvus:2.4-20240711-7e2a9d6b"; + private static final String TOKEN = "root:Milvus"; + private MilvusContainer container; + private MilvusServiceClient milvusClient; + private static final String COLLECTION_NAME = "simple_example"; + private static final String ID_FIELD = "book_id"; + private static final String VECTOR_FIELD = "book_intro"; + private static final String TITLE_FIELD = "book_title"; + private static final Integer VECTOR_DIM = 4; + + @BeforeAll + @Override + public void startUp() throws Exception { + this.container = + new MilvusContainer(MILVUS_IMAGE).withNetwork(NETWORK).withNetworkAliases(HOST); + Startables.deepStart(Stream.of(this.container)).join(); + log.info("Milvus host is {}", container.getHost()); + log.info("Milvus container started"); + Awaitility.given().ignoreExceptions().await().atMost(720L, TimeUnit.SECONDS); + this.initMilvus(); + this.initSourceData(); + } + + private void initMilvus() + throws SQLException, ClassNotFoundException, InstantiationException, + IllegalAccessException { + milvusClient = + new MilvusServiceClient( + ConnectParam.newBuilder() + .withUri(this.container.getEndpoint()) + .withToken(TOKEN) + .build()); + } + + private void initSourceData() { + // Define fields + List<FieldType> fieldsSchema = + Arrays.asList( + FieldType.newBuilder() + .withName(ID_FIELD) + .withDataType(DataType.Int64) + .withPrimaryKey(true) + .withAutoID(false) + .build(), + FieldType.newBuilder() + .withName(VECTOR_FIELD) + .withDataType(DataType.FloatVector) + .withDimension(VECTOR_DIM) + .build(), + FieldType.newBuilder() + .withName(TITLE_FIELD) + .withDataType(DataType.VarChar) + .withMaxLength(64) + .build()); + + // Create the collection with 3 fields + R<RpcStatus> ret = + milvusClient.createCollection( + CreateCollectionParam.newBuilder() + .withCollectionName(COLLECTION_NAME) + .withFieldTypes(fieldsSchema) + .build()); + if (ret.getStatus() != R.Status.Success.getCode()) { + throw new RuntimeException("Failed to create collection! Error: " + ret.getMessage()); + } + + // Specify an index type on the vector field. + ret = + milvusClient.createIndex( + CreateIndexParam.newBuilder() + .withCollectionName(COLLECTION_NAME) + .withFieldName(VECTOR_FIELD) + .withIndexType(IndexType.FLAT) + .withMetricType(MetricType.L2) + .build()); + if (ret.getStatus() != R.Status.Success.getCode()) { + throw new RuntimeException( + "Failed to create index on vector field! Error: " + ret.getMessage()); + } + + // Call loadCollection() to enable automatically loading data into memory for searching + milvusClient.loadCollection( + LoadCollectionParam.newBuilder().withCollectionName(COLLECTION_NAME).build()); + + log.info("Collection created"); + + // Insert 10 records into the collection + List<JSONObject> rows = new ArrayList<>(); + for (long i = 1L; i <= 10; ++i) { + JSONObject row = new JSONObject(); + row.put(ID_FIELD, i); + List<Float> vector = Arrays.asList((float) i, (float) i, (float) i, (float) i); + row.put(VECTOR_FIELD, vector); + row.put(TITLE_FIELD, "Tom and Jerry " + i); + rows.add(row); + } + + R<MutationResult> insertRet = + milvusClient.insert( + InsertParam.newBuilder() + .withCollectionName(COLLECTION_NAME) + .withRows(rows) + .build()); + if (insertRet.getStatus() != R.Status.Success.getCode()) { + throw new RuntimeException("Failed to insert! Error: " + insertRet.getMessage()); + } + } + + @AfterAll + @Override + public void tearDown() throws Exception { + this.milvusClient.close(); + this.container.close(); + } + + @TestTemplate + public void testMilvus(TestContainer container) throws IOException, InterruptedException { + Container.ExecResult execResult = container.executeJob("/milvus-to-milvus.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + + // assert table exist + R<Boolean> hasCollectionResponse = + this.milvusClient.hasCollection( + HasCollectionParam.newBuilder() + .withDatabaseName("test") + .withCollectionName(COLLECTION_NAME) + .build()); + Assertions.assertTrue(hasCollectionResponse.getData()); + + // check table fields + R<DescribeCollectionResponse> describeCollectionResponseR = + this.milvusClient.describeCollection( + DescribeCollectionParam.newBuilder() + .withDatabaseName("test") + .withCollectionName(COLLECTION_NAME) + .build()); + + DescribeCollectionResponse data = describeCollectionResponseR.getData(); + List<String> fileds = + data.getSchema().getFieldsList().stream() + .map(FieldSchema::getName) + .collect(Collectors.toList()); + Assertions.assertTrue(fileds.contains(ID_FIELD)); + Assertions.assertTrue(fileds.contains(VECTOR_FIELD)); + Assertions.assertTrue(fileds.contains(TITLE_FIELD)); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/resources/milvus-to-milvus.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/resources/milvus-to-milvus.conf new file mode 100644 index 0000000000..5b5b9aec78 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/resources/milvus-to-milvus.conf @@ -0,0 +1,36 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + Milvus { + url = "http://milvus-e2e:19530" + token = "root:Milvus" + } +} + +sink { + Milvus { + url = "http://milvus-e2e:19530" + token = "root:Milvus" + database="test" + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml index 47864f21c6..0a0f909e19 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml @@ -74,6 +74,7 @@ <module>connector-cdc-oracle-e2e</module> <module>connector-hive-e2e</module> <module>connector-hudi-e2e</module> + <module>connector-milvus-e2e</module> </modules> <dependencies>