This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 0c69b9166e [Feature][Connector-V2][Milvus] Support Milvus source & 
sink (#7158)
0c69b9166e is described below

commit 0c69b9166efb9637f83f0f8f817250f2cce15522
Author: Thomas-HuWei <99788018+thomas-hu...@users.noreply.github.com>
AuthorDate: Fri Jul 12 17:50:26 2024 +0800

    [Feature][Connector-V2][Milvus] Support Milvus source & sink (#7158)
---
 config/plugin_config                               |   1 +
 docs/en/connector-v2/sink/Mivlus.md                |  59 +++
 docs/en/connector-v2/source/Mivlus.md              |  55 +++
 plugin-mapping.properties                          |   2 +
 .../apache/seatunnel/api/table/catalog/Column.java |   3 +-
 .../seatunnel/api/table/catalog/ConstraintKey.java |   3 +-
 .../seatunnel/api/table/catalog/PrimaryKey.java    |  19 +
 .../seatunnel/api/table/catalog/VectorIndex.java   | 110 ++++++
 .../seatunnel/api/table/type/SeaTunnelRow.java     |   2 +
 .../apache/seatunnel/api/table/type/SqlType.java   |   5 +
 .../seatunnel/api/table/type/VectorType.java       |  85 +++++
 seatunnel-connectors-v2/connector-milvus/pom.xml   |  60 ++++
 .../seatunnel/milvus/catalog/MilvusCatalog.java    | 380 ++++++++++++++++++++
 .../milvus/catalog/MilvusCatalogFactory.java       |  45 +++
 .../seatunnel/milvus/catalog/MilvusOptions.java    |  24 +-
 .../seatunnel/milvus/config/MilvusSinkConfig.java  |  87 +++++
 .../milvus/config/MilvusSourceConfig.java          |  48 +++
 .../milvus/convert/MilvusConvertUtils.java         | 397 +++++++++++++++++++++
 .../exception/MilvusConnectionErrorCode.java       |  57 +++
 .../milvus/exception/MilvusConnectorException.java |  41 +++
 .../seatunnel/milvus/sink/MilvusSink.java          | 116 ++++++
 .../seatunnel/milvus/sink/MilvusSinkCommitter.java |  56 +++
 .../seatunnel/milvus/sink/MilvusSinkFactory.java   |  80 +++++
 .../seatunnel/milvus/sink/MilvusSinkWriter.java    | 129 +++++++
 .../milvus/sink/batch/MilvusBatchWriter.java       |  33 +-
 .../milvus/sink/batch/MilvusBufferBatchWriter.java | 143 ++++++++
 .../seatunnel/milvus/source/MilvusSource.java      |  82 +++++
 .../milvus/source/MilvusSourceFactory.java         |  61 ++++
 .../milvus/source/MilvusSourceReader.java          | 261 ++++++++++++++
 .../seatunnel/milvus/source/MilvusSourceSplit.java |  39 +-
 .../milvus/source/MilvusSourceSplitEnumertor.java  | 192 ++++++++++
 .../seatunnel/milvus/source/MilvusSourceState.java |  36 +-
 .../milvus/state/MilvusAggregatedCommitInfo.java   |  32 +-
 .../seatunnel/milvus/state/MilvusCommitInfo.java   |  31 +-
 .../seatunnel/milvus/state/MilvusSinkState.java    |  33 +-
 seatunnel-connectors-v2/pom.xml                    |   1 +
 seatunnel-dist/pom.xml                             |   7 +
 .../connector-milvus-e2e/pom.xml                   |  66 ++++
 .../e2e/connector/v2/milvus/MilvusIT.java          | 218 +++++++++++
 .../src/test/resources/milvus-to-milvus.conf       |  36 ++
 seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml   |   1 +
 41 files changed, 2985 insertions(+), 151 deletions(-)

diff --git a/config/plugin_config b/config/plugin_config
index e642a30021..d80d2e6ab0 100644
--- a/config/plugin_config
+++ b/config/plugin_config
@@ -85,4 +85,5 @@ connector-paimon
 connector-rocketmq
 connector-tdengine
 connector-web3j
+connector-milvus
 --end--
\ No newline at end of file
diff --git a/docs/en/connector-v2/sink/Mivlus.md 
b/docs/en/connector-v2/sink/Mivlus.md
new file mode 100644
index 0000000000..081f427a5d
--- /dev/null
+++ b/docs/en/connector-v2/sink/Mivlus.md
@@ -0,0 +1,59 @@
+# Milvus
+
+> Milvus sink connector
+
+## Description
+
+Write data to Milvus or Zilliz Cloud
+
+## Key Features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [column projection](../../concept/connector-v2-features.md)
+
+## Data Type Mapping
+
+|  Milvus Data Type   | SeaTunnel Data Type |
+|---------------------|---------------------|
+| INT8                | TINYINT             |
+| INT16               | SMALLINT            |
+| INT32               | INT                 |
+| INT64               | BIGINT              |
+| FLOAT               | FLOAT               |
+| DOUBLE              | DOUBLE              |
+| BOOL                | BOOLEAN             |
+| JSON                | STRING              |
+| ARRAY               | ARRAY               |
+| VARCHAR             | STRING              |
+| FLOAT_VECTOR        | FLOAT_VECTOR        |
+| BINARY_VECTOR       | BINARY_VECTOR       |
+| FLOAT16_VECTOR      | FLOAT16_VECTOR      |
+| BFLOAT16_VECTOR     | BFLOAT16_VECTOR     |
+| SPARSE_FLOAT_VECTOR | SPARSE_FLOAT_VECTOR |
+
+## Sink Options
+
+|         Name         |  Type   | Required |           Default            |   
                     Description                        |
+|----------------------|---------|----------|------------------------------|-----------------------------------------------------------|
+| url                  | String  | Yes      | -                            | 
The URL to connect to Milvus or Zilliz Cloud.             |
+| token                | String  | Yes      | -                            | 
User:password                                             |
+| database             | String  | No       | -                            | 
Write data to which database, default is source database. |
+| schema_save_mode     | enum    | No       | CREATE_SCHEMA_WHEN_NOT_EXIST | 
Auto create table when table not exist.                   |
+| enable_auto_id       | boolean | No       | false                        | 
Primary key column enable autoId.                         |
+| enable_upsert        | boolean | No       | false                        | 
Upsert data not insert.                                   |
+| enable_dynamic_field | boolean | No       | true                         | 
Enable create table with dynamic field.                   |
+| batch_size           | int     | No       | 1000                         | 
Write batch size.                                         |
+
+## Task Example
+
+```bash
+sink {
+  Milvus {
+    url = "http://127.0.0.1:19530";
+    token = "username:password"
+    batch_size = 1000
+  }
+}
+```
+
diff --git a/docs/en/connector-v2/source/Mivlus.md 
b/docs/en/connector-v2/source/Mivlus.md
new file mode 100644
index 0000000000..a56df4c5fe
--- /dev/null
+++ b/docs/en/connector-v2/source/Mivlus.md
@@ -0,0 +1,55 @@
+# Milvus
+
+> Milvus source connector
+
+## Description
+
+Read data from Milvus or Zilliz Cloud
+
+## Key Features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [column projection](../../concept/connector-v2-features.md)
+
+## Data Type Mapping
+
+|  Milvus Data Type   | SeaTunnel Data Type |
+|---------------------|---------------------|
+| INT8                | TINYINT             |
+| INT16               | SMALLINT            |
+| INT32               | INT                 |
+| INT64               | BIGINT              |
+| FLOAT               | FLOAT               |
+| DOUBLE              | DOUBLE              |
+| BOOL                | BOOLEAN             |
+| JSON                | STRING              |
+| ARRAY               | ARRAY               |
+| VARCHAR             | STRING              |
+| FLOAT_VECTOR        | FLOAT_VECTOR        |
+| BINARY_VECTOR       | BINARY_VECTOR       |
+| FLOAT16_VECTOR      | FLOAT16_VECTOR      |
+| BFLOAT16_VECTOR     | BFLOAT16_VECTOR     |
+| SPARSE_FLOAT_VECTOR | SPARSE_FLOAT_VECTOR |
+
+## Source Options
+
+|    Name    |  Type  | Required | Default |                                   
     Description                                         |
+|------------|--------|----------|---------|--------------------------------------------------------------------------------------------|
+| url        | String | Yes      | -       | The URL to connect to Milvus or 
Zilliz Cloud.                                              |
+| token      | String | Yes      | -       | User:password                     
                                                         |
+| database   | String | Yes      | default | Read data from which database.    
                                                         |
+| collection | String | No       | -       | If set, will only read one 
collection, otherwise will read all collections under database. |
+
+## Task Example
+
+```bash
+source {
+  Milvus {
+    url = "http://127.0.0.1:19530";
+    token = "username:password"
+    database = "default"
+  }
+}
+```
+
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 6304236ec3..9936afcbaa 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -127,3 +127,5 @@ seatunnel.source.Oracle-CDC = connector-cdc-oracle
 seatunnel.sink.Pulsar = connector-pulsar
 seatunnel.source.ObsFile = connector-file-obs
 seatunnel.sink.ObsFile = connector-file-obs
+seatunnel.source.Milvus = connector-milvus
+seatunnel.sink.Milvus = connector-milvus
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
index d7e236d309..9c3ed338c9 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
@@ -60,7 +60,8 @@ public abstract class Column implements Serializable {
      * Number of digits to right of the decimal point.
      *
      * <p>For decimal data, this is the maximum scale. For time/timestamp 
data, this is the maximum
-     * allowed precision of the fractional seconds component.
+     * allowed precision of the fractional seconds component. For vector data, 
this is the vector
+     * dimension.
      *
      * <p>Null is returned for data types where the scale is not applicable.
      */
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/ConstraintKey.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/ConstraintKey.java
index 2d39641a42..f2d62852a0 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/ConstraintKey.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/ConstraintKey.java
@@ -72,7 +72,8 @@ public class ConstraintKey implements Serializable {
     public enum ConstraintType {
         INDEX_KEY,
         UNIQUE_KEY,
-        FOREIGN_KEY
+        FOREIGN_KEY,
+        VECTOR_INDEX_KEY
     }
 
     public enum ColumnSortType {
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PrimaryKey.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PrimaryKey.java
index e8a3a74025..ad88539c2f 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PrimaryKey.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PrimaryKey.java
@@ -34,6 +34,25 @@ public class PrimaryKey implements Serializable {
 
     private final List<String> columnNames;
 
+    private Boolean enableAutoId;
+
+    public PrimaryKey(String primaryKey, List<String> columnNames) {
+        this.primaryKey = primaryKey;
+        this.columnNames = columnNames;
+        this.enableAutoId = null;
+    }
+
+    public static boolean isPrimaryKeyField(PrimaryKey primaryKey, String 
fieldName) {
+        if (primaryKey == null || primaryKey.getColumnNames() == null) {
+            return false;
+        }
+        return primaryKey.getColumnNames().contains(fieldName);
+    }
+
+    public static PrimaryKey of(String primaryKey, List<String> columnNames, 
Boolean autoId) {
+        return new PrimaryKey(primaryKey, columnNames, autoId);
+    }
+
     public static PrimaryKey of(String primaryKey, List<String> columnNames) {
         return new PrimaryKey(primaryKey, columnNames);
     }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/VectorIndex.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/VectorIndex.java
new file mode 100644
index 0000000000..5d6dd1beaa
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/VectorIndex.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.catalog;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+
+import java.io.Serializable;
+
+/** Vector Database need special Index on its vector field. */
+@EqualsAndHashCode(callSuper = true)
+@Getter
+public class VectorIndex extends ConstraintKey.ConstraintKeyColumn implements 
Serializable {
+
+    /** Vector index name */
+    private final String indexName;
+
+    /** Vector indexType, such as IVF_FLAT, HNSW, DISKANN */
+    private final IndexType indexType;
+
+    /** Vector index metricType, such as L2, IP, COSINE */
+    private final MetricType metricType;
+
+    public VectorIndex(String indexName, String columnName, String indexType, 
String metricType) {
+        super(columnName, null);
+        this.indexName = indexName;
+        this.indexType = IndexType.of(indexType);
+        this.metricType = MetricType.of(metricType);
+    }
+
+    public VectorIndex(
+            String indexName, String columnName, IndexType indexType, 
MetricType metricType) {
+        super(columnName, null);
+        this.indexName = indexName;
+        this.indexType = indexType;
+        this.metricType = metricType;
+    }
+
+    @Override
+    public ConstraintKey.ConstraintKeyColumn copy() {
+        return new VectorIndex(indexName, getColumnName(), indexType, 
metricType);
+    }
+
+    public enum IndexType {
+        FLAT,
+        IVF_FLAT,
+        IVF_SQ8,
+        IVF_PQ,
+        HNSW,
+        DISKANN,
+        AUTOINDEX,
+        SCANN,
+
+        // GPU indexes only for float vectors
+        GPU_IVF_FLAT,
+        GPU_IVF_PQ,
+        GPU_BRUTE_FORCE,
+        GPU_CAGRA,
+
+        // Only supported for binary vectors
+        BIN_FLAT,
+        BIN_IVF_FLAT,
+
+        // Only for varchar type field
+        TRIE,
+        // Only for scalar type field
+        STL_SORT, // only for numeric type field
+        INVERTED, // works for all scalar fields except JSON type field
+
+        // Only for sparse vectors
+        SPARSE_INVERTED_INDEX,
+        SPARSE_WAND,
+        ;
+
+        public static IndexType of(String name) {
+            return valueOf(name.toUpperCase());
+        }
+    }
+
+    public enum MetricType {
+        // Only for float vectors
+        L2,
+        IP,
+        COSINE,
+
+        // Only for binary vectors
+        HAMMING,
+        JACCARD,
+        ;
+
+        public static MetricType of(String name) {
+            return valueOf(name.toUpperCase());
+        }
+    }
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
index 1e507cb1fa..95a36b796c 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
@@ -141,6 +141,8 @@ public final class SeaTunnelRow implements Serializable {
                 return 12;
             case TIMESTAMP:
                 return 48;
+            case FLOAT_VECTOR:
+                return getArrayNotNullSize((Object[]) v) * 4;
             case ARRAY:
                 SeaTunnelDataType elementType = ((ArrayType) 
dataType).getElementType();
                 if (elementType instanceof DecimalType) {
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
index 838a384809..e33ceb8d3c 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
@@ -35,6 +35,11 @@ public enum SqlType {
     DATE,
     TIME,
     TIMESTAMP,
+    BINARY_VECTOR,
+    FLOAT_VECTOR,
+    FLOAT16_VECTOR,
+    BFLOAT16_VECTOR,
+    SPARSE_FLOAT_VECTOR,
     ROW,
     MULTIPLE_ROW;
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/VectorType.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/VectorType.java
new file mode 100644
index 0000000000..39d2849f1a
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/VectorType.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.type;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Objects;
+
+public class VectorType<T> implements SeaTunnelDataType<T> {
+    private static final long serialVersionUID = 2L;
+
+    public static final VectorType<Float> VECTOR_FLOAT_TYPE =
+            new VectorType<>(Float.class, SqlType.FLOAT_VECTOR);
+
+    public static final VectorType<Map> VECTOR_SPARSE_FLOAT_TYPE =
+            new VectorType<>(Map.class, SqlType.SPARSE_FLOAT_VECTOR);
+
+    public static final VectorType<Byte> VECTOR_BINARY_TYPE =
+            new VectorType<>(Byte.class, SqlType.BINARY_VECTOR);
+
+    public static final VectorType<ByteBuffer> VECTOR_FLOAT16_TYPE =
+            new VectorType<>(ByteBuffer.class, SqlType.FLOAT16_VECTOR);
+
+    public static final VectorType<ByteBuffer> VECTOR_BFLOAT16_TYPE =
+            new VectorType<>(ByteBuffer.class, SqlType.BFLOAT16_VECTOR);
+
+    // 
--------------------------------------------------------------------------------------------
+
+    /** The physical type class. */
+    private final Class<T> typeClass;
+
+    private final SqlType sqlType;
+
+    protected VectorType(Class<T> typeClass, SqlType sqlType) {
+        this.typeClass = typeClass;
+        this.sqlType = sqlType;
+    }
+
+    @Override
+    public Class<T> getTypeClass() {
+        return this.typeClass;
+    }
+
+    @Override
+    public SqlType getSqlType() {
+        return this.sqlType;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (!(obj instanceof VectorType)) {
+            return false;
+        }
+        VectorType<?> that = (VectorType<?>) obj;
+        return Objects.equals(typeClass, that.typeClass) && 
Objects.equals(sqlType, that.sqlType);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(typeClass, sqlType);
+    }
+
+    @Override
+    public String toString() {
+        return sqlType.toString();
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-milvus/pom.xml 
b/seatunnel-connectors-v2/connector-milvus/pom.xml
new file mode 100644
index 0000000000..50d69d4f5b
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-milvus/pom.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-connectors-v2</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>connector-milvus</artifactId>
+    <name>SeaTunnel : Connectors V2 : Milvus</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>io.milvus</groupId>
+            <artifactId>milvus-sdk-java</artifactId>
+            <version>2.4.1</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-reload4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>4.11.0</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-inline</artifactId>
+            <version>4.11.0</version>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+</project>
diff --git 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusCatalog.java
 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusCatalog.java
new file mode 100644
index 0000000000..dcca41320c
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusCatalog.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.milvus.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.InfoPreviewResult;
+import org.apache.seatunnel.api.table.catalog.PreviewResult;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.catalog.VectorIndex;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
+import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import 
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import 
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.milvus.convert.MilvusConvertUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectionErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException;
+
+import org.apache.commons.collections4.CollectionUtils;
+
+import io.milvus.client.MilvusServiceClient;
+import io.milvus.common.clientenum.ConsistencyLevelEnum;
+import io.milvus.grpc.DataType;
+import io.milvus.grpc.ListDatabasesResponse;
+import io.milvus.grpc.ShowCollectionsResponse;
+import io.milvus.grpc.ShowType;
+import io.milvus.param.ConnectParam;
+import io.milvus.param.IndexType;
+import io.milvus.param.MetricType;
+import io.milvus.param.R;
+import io.milvus.param.RpcStatus;
+import io.milvus.param.collection.CreateCollectionParam;
+import io.milvus.param.collection.CreateDatabaseParam;
+import io.milvus.param.collection.DropCollectionParam;
+import io.milvus.param.collection.DropDatabaseParam;
+import io.milvus.param.collection.FieldType;
+import io.milvus.param.collection.HasCollectionParam;
+import io.milvus.param.collection.ShowCollectionsParam;
+import io.milvus.param.index.CreateIndexParam;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+@Slf4j
+public class MilvusCatalog implements Catalog {
+
+    private final String catalogName;
+    private final ReadonlyConfig config;
+
+    private MilvusServiceClient client;
+
+    public MilvusCatalog(String catalogName, ReadonlyConfig config) {
+        this.catalogName = catalogName;
+        this.config = config;
+    }
+
+    @Override
+    public void open() throws CatalogException {
+        ConnectParam connectParam =
+                ConnectParam.newBuilder()
+                        .withUri(config.get(MilvusSinkConfig.URL))
+                        .withToken(config.get(MilvusSinkConfig.TOKEN))
+                        .build();
+        try {
+            this.client = new MilvusServiceClient(connectParam);
+        } catch (Exception e) {
+            throw new CatalogException(String.format("Failed to open catalog 
%s", catalogName), e);
+        }
+    }
+
+    @Override
+    public void close() throws CatalogException {
+        this.client.close();
+    }
+
+    @Override
+    public String name() {
+        return catalogName;
+    }
+
+    @Override
+    public PreviewResult previewAction(
+            ActionType actionType, TablePath tablePath, Optional<CatalogTable> 
catalogTable) {
+        if (actionType == ActionType.CREATE_TABLE) {
+            return new InfoPreviewResult("create collection " + 
tablePath.getTableName());
+        } else if (actionType == ActionType.DROP_TABLE) {
+            return new InfoPreviewResult("drop collection " + 
tablePath.getTableName());
+        } else if (actionType == ActionType.CREATE_DATABASE) {
+            return new InfoPreviewResult("create database " + 
tablePath.getDatabaseName());
+        } else if (actionType == ActionType.DROP_DATABASE) {
+            return new InfoPreviewResult("drop database " + 
tablePath.getDatabaseName());
+        } else {
+            throw new UnsupportedOperationException("Unsupported action type: 
" + actionType);
+        }
+    }
+
+    @Override
+    public String getDefaultDatabase() throws CatalogException {
+        return "default";
+    }
+
+    @Override
+    public boolean databaseExists(String databaseName) throws CatalogException 
{
+        List<String> databases = this.listDatabases();
+        return databases.contains(databaseName);
+    }
+
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        R<ListDatabasesResponse> response = this.client.listDatabases();
+        return response.getData().getDbNamesList();
+    }
+
+    @Override
+    public List<String> listTables(String databaseName)
+            throws CatalogException, DatabaseNotExistException {
+        R<ShowCollectionsResponse> response =
+                this.client.showCollections(
+                        ShowCollectionsParam.newBuilder()
+                                .withDatabaseName(databaseName)
+                                .withShowType(ShowType.All)
+                                .build());
+
+        return response.getData().getCollectionNamesList();
+    }
+
+    @Override
+    public boolean tableExists(TablePath tablePath) throws CatalogException {
+        R<Boolean> response =
+                this.client.hasCollection(
+                        HasCollectionParam.newBuilder()
+                                .withDatabaseName(tablePath.getDatabaseName())
+                                .withCollectionName(tablePath.getTableName())
+                                .build());
+        if (response.getData() != null) {
+            return response.getData();
+        }
+        throw new MilvusConnectorException(
+                MilvusConnectionErrorCode.SERVER_RESPONSE_FAILED,
+                response.getMessage(),
+                response.getException());
+    }
+
+    @Override
+    public CatalogTable getTable(TablePath tablePath)
+            throws CatalogException, TableNotExistException {
+        throw new RuntimeException("not implemented");
+    }
+
+    @Override
+    public void createTable(TablePath tablePath, CatalogTable catalogTable, 
boolean ignoreIfExists)
+            throws TableAlreadyExistException, DatabaseNotExistException, 
CatalogException {
+        checkNotNull(tablePath, "Table path cannot be null");
+        if (!databaseExists(tablePath.getDatabaseName())) {
+            throw new DatabaseNotExistException(catalogName, 
tablePath.getDatabaseName());
+        }
+        if (tableExists(tablePath)) {
+            if (ignoreIfExists) {
+                return;
+            }
+            throw new TableAlreadyExistException(catalogName, tablePath);
+        }
+
+        checkNotNull(catalogTable, "catalogTable must not be null");
+        TableSchema tableSchema = catalogTable.getTableSchema();
+        checkNotNull(tableSchema, "tableSchema must not be null");
+        createTableInternal(tablePath, catalogTable);
+
+        if (CollectionUtils.isNotEmpty(tableSchema.getConstraintKeys())) {
+            for (ConstraintKey constraintKey : 
tableSchema.getConstraintKeys()) {
+                if (constraintKey
+                        .getConstraintType()
+                        
.equals(ConstraintKey.ConstraintType.VECTOR_INDEX_KEY)) {
+                    createIndexInternal(tablePath, 
constraintKey.getColumnNames());
+                }
+            }
+        }
+    }
+
+    private void createIndexInternal(
+            TablePath tablePath, List<ConstraintKey.ConstraintKeyColumn> 
vectorIndexes) {
+        for (ConstraintKey.ConstraintKeyColumn column : vectorIndexes) {
+            VectorIndex index = (VectorIndex) column;
+            CreateIndexParam createIndexParam =
+                    CreateIndexParam.newBuilder()
+                            .withDatabaseName(tablePath.getDatabaseName())
+                            .withCollectionName(tablePath.getTableName())
+                            .withFieldName(index.getColumnName())
+                            .withIndexName(index.getIndexName())
+                            
.withIndexType(IndexType.valueOf(index.getIndexType().name()))
+                            
.withMetricType(MetricType.valueOf(index.getMetricType().name()))
+                            .build();
+
+            R<RpcStatus> response = client.createIndex(createIndexParam);
+            if (!Objects.equals(response.getStatus(), 
R.success().getStatus())) {
+                throw new MilvusConnectorException(
+                        MilvusConnectionErrorCode.CREATE_INDEX_ERROR, 
response.getMessage());
+            }
+        }
+    }
+
+    public void createTableInternal(TablePath tablePath, CatalogTable 
catalogTable) {
+        try {
+            TableSchema tableSchema = catalogTable.getTableSchema();
+            List<FieldType> fieldTypes = new ArrayList<>();
+            for (Column column : tableSchema.getColumns()) {
+                fieldTypes.add(convertToFieldType(column, 
tableSchema.getPrimaryKey()));
+            }
+
+            Map<String, String> options = catalogTable.getOptions();
+            Boolean enableDynamicField =
+                    (options.containsKey(MilvusOptions.ENABLE_DYNAMIC_FIELD))
+                            ? 
Boolean.valueOf(options.get(MilvusOptions.ENABLE_DYNAMIC_FIELD))
+                            : 
config.get(MilvusSinkConfig.ENABLE_DYNAMIC_FIELD);
+
+            CreateCollectionParam.Builder builder =
+                    CreateCollectionParam.newBuilder()
+                            .withDatabaseName(tablePath.getDatabaseName())
+                            .withCollectionName(tablePath.getTableName())
+                            .withFieldTypes(fieldTypes)
+                            .withEnableDynamicField(enableDynamicField)
+                            
.withConsistencyLevel(ConsistencyLevelEnum.BOUNDED);
+            if (null != catalogTable.getComment()) {
+                builder.withDescription(catalogTable.getComment());
+            }
+
+            CreateCollectionParam createCollectionParam = builder.build();
+            R<RpcStatus> response = 
this.client.createCollection(createCollectionParam);
+            if (!Objects.equals(response.getStatus(), 
R.success().getStatus())) {
+                throw new MilvusConnectorException(
+                        MilvusConnectionErrorCode.CREATE_COLLECTION_ERROR, 
response.getMessage());
+            }
+        } catch (Exception e) {
+            throw new MilvusConnectorException(
+                    MilvusConnectionErrorCode.CREATE_COLLECTION_ERROR, e);
+        }
+    }
+
+    private FieldType convertToFieldType(Column column, PrimaryKey primaryKey) 
{
+        SeaTunnelDataType<?> seaTunnelDataType = column.getDataType();
+        FieldType.Builder build =
+                FieldType.newBuilder()
+                        .withName(column.getName())
+                        .withDataType(
+                                MilvusConvertUtils.convertSqlTypeToDataType(
+                                        seaTunnelDataType.getSqlType()));
+        switch (seaTunnelDataType.getSqlType()) {
+            case ROW:
+                build.withMaxLength(65535);
+                break;
+            case DATE:
+                build.withMaxLength(20);
+                break;
+            case INT:
+                build.withDataType(DataType.Int32);
+                break;
+            case SMALLINT:
+                build.withDataType(DataType.Int16);
+                break;
+            case TINYINT:
+                build.withDataType(DataType.Int8);
+                break;
+            case FLOAT:
+                build.withDataType(DataType.Float);
+                break;
+            case DOUBLE:
+                build.withDataType(DataType.Double);
+                break;
+            case MAP:
+                build.withDataType(DataType.JSON);
+                break;
+            case BOOLEAN:
+                build.withDataType(DataType.Bool);
+                break;
+            case STRING:
+                if (column.getColumnLength() == 0) {
+                    build.withMaxLength(512);
+                } else {
+                    build.withMaxLength((int) (column.getColumnLength() / 4));
+                }
+                break;
+            case ARRAY:
+                ArrayType arrayType = (ArrayType) column.getDataType();
+                SeaTunnelDataType elementType = arrayType.getElementType();
+                build.withElementType(
+                        
MilvusConvertUtils.convertSqlTypeToDataType(elementType.getSqlType()));
+                build.withMaxCapacity(4095);
+                switch (elementType.getSqlType()) {
+                    case STRING:
+                        if (column.getColumnLength() == 0) {
+                            build.withMaxLength(512);
+                        } else {
+                            build.withMaxLength((int) 
(column.getColumnLength() / 4));
+                        }
+                        break;
+                }
+                break;
+            case BINARY_VECTOR:
+            case FLOAT_VECTOR:
+            case FLOAT16_VECTOR:
+            case BFLOAT16_VECTOR:
+                build.withDimension(column.getScale());
+                break;
+        }
+
+        if (null != primaryKey && 
primaryKey.getColumnNames().contains(column.getName())) {
+            build.withPrimaryKey(true);
+            if (null != primaryKey.getEnableAutoId()) {
+                build.withAutoID(primaryKey.getEnableAutoId());
+            } else {
+                build.withAutoID(config.get(MilvusSinkConfig.ENABLE_AUTO_ID));
+            }
+        }
+
+        return build.build();
+    }
+
+    @Override
+    public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
+            throws TableNotExistException, CatalogException {
+        this.client.dropCollection(
+                DropCollectionParam.newBuilder()
+                        .withDatabaseName(tablePath.getDatabaseName())
+                        .withCollectionName(tablePath.getTableName())
+                        .build());
+    }
+
+    @Override
+    public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
+            throws DatabaseAlreadyExistException, CatalogException {
+        R<RpcStatus> response =
+                this.client.createDatabase(
+                        CreateDatabaseParam.newBuilder()
+                                .withDatabaseName(tablePath.getDatabaseName())
+                                .build());
+        if (!R.success().getStatus().equals(response.getStatus())) {
+            throw new MilvusConnectorException(
+                    MilvusConnectionErrorCode.CREATE_DATABASE_ERROR, 
response.getMessage());
+        }
+    }
+
+    @Override
+    public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
+            throws DatabaseNotExistException, CatalogException {
+        this.client.dropDatabase(
+                DropDatabaseParam.newBuilder()
+                        .withDatabaseName(tablePath.getDatabaseName())
+                        .build());
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusCatalogFactory.java
new file mode 100644
index 0000000000..292c0464f2
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusCatalogFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.milvus.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.factory.Factory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class MilvusCatalogFactory implements CatalogFactory {
+
+    @Override
+    public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
+        return new MilvusCatalog(catalogName, options);
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return "Milvus";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder().build();
+    }
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusOptions.java
similarity index 70%
copy from 
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
copy to 
seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusOptions.java
index 838a384809..b589b21d3d 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
+++ 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusOptions.java
@@ -14,27 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.seatunnel.connectors.seatunnel.milvus.catalog;
 
-package org.apache.seatunnel.api.table.type;
+public class MilvusOptions {
 
-/** The sql type of {@link SeaTunnelDataType}. */
-public enum SqlType {
-    ARRAY,
-    MAP,
-    STRING,
-    BOOLEAN,
-    TINYINT,
-    SMALLINT,
-    INT,
-    BIGINT,
-    FLOAT,
-    DOUBLE,
-    DECIMAL,
-    NULL,
-    BYTES,
-    DATE,
-    TIME,
-    TIMESTAMP,
-    ROW,
-    MULTIPLE_ROW;
+    public static final String ENABLE_DYNAMIC_FIELD = "enableDynamicField";
 }
diff --git 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkConfig.java
 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkConfig.java
new file mode 100644
index 0000000000..d2357e559c
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkConfig.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.milvus.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
+
+import java.util.Arrays;
+
+import static org.apache.seatunnel.api.sink.DataSaveMode.APPEND_DATA;
+import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA;
+import static 
org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS;
+
+public class MilvusSinkConfig {
+
+    public static final String CONNECTOR_IDENTITY = "Milvus";
+
+    public static final Option<String> URL =
+            Options.key("url")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Milvus public endpoint");
+
+    public static final Option<String> TOKEN =
+            Options.key("token")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Milvus token for authentication");
+
+    public static final Option<String> DATABASE =
+            
Options.key("database").stringType().noDefaultValue().withDescription("database");
+
+    public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
+            Options.key("schema_save_mode")
+                    .enumType(SchemaSaveMode.class)
+                    .defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST)
+                    .withDescription("schema_save_mode");
+
+    public static final Option<DataSaveMode> DATA_SAVE_MODE =
+            Options.key("data_save_mode")
+                    .singleChoice(
+                            DataSaveMode.class,
+                            Arrays.asList(DROP_DATA, APPEND_DATA, 
ERROR_WHEN_DATA_EXISTS))
+                    .defaultValue(APPEND_DATA)
+                    .withDescription("data_save_mode");
+
+    public static final Option<Boolean> ENABLE_AUTO_ID =
+            Options.key("enable_auto_id")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Enable Auto Id");
+
+    public static final Option<Boolean> ENABLE_UPSERT =
+            Options.key("enable_upsert")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription("Enable upsert mode");
+
+    public static final Option<Boolean> ENABLE_DYNAMIC_FIELD =
+            Options.key("enable_dynamic_field")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription("Enable dynamic field");
+
+    public static final Option<Integer> BATCH_SIZE =
+            Options.key("batch_size")
+                    .intType()
+                    .defaultValue(1000)
+                    .withDescription("writer batch size");
+}
diff --git 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSourceConfig.java
 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSourceConfig.java
new file mode 100644
index 0000000000..aa92286ac0
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSourceConfig.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.milvus.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class MilvusSourceConfig {
+
+    public static final Option<String> URL =
+            Options.key("url")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Milvus public endpoint");
+
+    public static final Option<String> TOKEN =
+            Options.key("token")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Milvus token for authentication");
+
+    public static final Option<String> DATABASE =
+            Options.key("database")
+                    .stringType()
+                    .defaultValue("default")
+                    .withDescription("database");
+
+    public static final Option<String> COLLECTION =
+            Options.key("collection")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Milvus collection to read");
+}
diff --git 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/convert/MilvusConvertUtils.java
 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/convert/MilvusConvertUtils.java
new file mode 100644
index 0000000000..6b2661680b
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/convert/MilvusConvertUtils.java
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.milvus.convert;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.catalog.VectorIndex;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SqlType;
+import org.apache.seatunnel.api.table.type.VectorType;
+import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.connectors.seatunnel.milvus.catalog.MilvusOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectionErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.util.Lists;
+
+import com.google.protobuf.ProtocolStringList;
+import io.milvus.client.MilvusServiceClient;
+import io.milvus.common.utils.JacksonUtils;
+import io.milvus.grpc.CollectionSchema;
+import io.milvus.grpc.DataType;
+import io.milvus.grpc.DescribeCollectionResponse;
+import io.milvus.grpc.DescribeIndexResponse;
+import io.milvus.grpc.FieldSchema;
+import io.milvus.grpc.IndexDescription;
+import io.milvus.grpc.KeyValuePair;
+import io.milvus.grpc.ShowCollectionsResponse;
+import io.milvus.grpc.ShowType;
+import io.milvus.param.ConnectParam;
+import io.milvus.param.R;
+import io.milvus.param.collection.DescribeCollectionParam;
+import io.milvus.param.collection.ShowCollectionsParam;
+import io.milvus.param.index.DescribeIndexParam;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class MilvusConvertUtils {
+
+    private static final String CATALOG_NAME = "Milvus";
+
+    public static Map<TablePath, CatalogTable> getSourceTables(ReadonlyConfig 
config) {
+        MilvusServiceClient client =
+                new MilvusServiceClient(
+                        ConnectParam.newBuilder()
+                                .withUri(config.get(MilvusSourceConfig.URL))
+                                
.withToken(config.get(MilvusSourceConfig.TOKEN))
+                                .build());
+
+        String database = config.get(MilvusSourceConfig.DATABASE);
+        List<String> collectionList = new ArrayList<>();
+        if (StringUtils.isNotEmpty(config.get(MilvusSourceConfig.COLLECTION))) 
{
+            collectionList.add(config.get(MilvusSourceConfig.COLLECTION));
+        } else {
+            R<ShowCollectionsResponse> response =
+                    client.showCollections(
+                            ShowCollectionsParam.newBuilder()
+                                    .withDatabaseName(database)
+                                    .withShowType(ShowType.All)
+                                    .build());
+            if (response.getStatus() != R.Status.Success.getCode()) {
+                throw new MilvusConnectorException(
+                        MilvusConnectionErrorCode.SHOW_COLLECTIONS_ERROR);
+            }
+
+            ProtocolStringList collections = 
response.getData().getCollectionNamesList();
+            if (CollectionUtils.isEmpty(collections)) {
+                throw new MilvusConnectorException(
+                        MilvusConnectionErrorCode.DATABASE_NO_COLLECTIONS, 
database);
+            }
+            collectionList.addAll(collections);
+        }
+
+        Map<TablePath, CatalogTable> map = new HashMap<>();
+        for (String collection : collectionList) {
+            CatalogTable catalogTable = getCatalogTable(client, database, 
collection);
+            map.put(TablePath.of(database, collection), catalogTable);
+        }
+        return map;
+    }
+
+    public static CatalogTable getCatalogTable(
+            MilvusServiceClient client, String database, String collection) {
+        R<DescribeCollectionResponse> response =
+                client.describeCollection(
+                        DescribeCollectionParam.newBuilder()
+                                .withDatabaseName(database)
+                                .withCollectionName(collection)
+                                .build());
+
+        if (response.getStatus() != R.Status.Success.getCode()) {
+            throw new 
MilvusConnectorException(MilvusConnectionErrorCode.DESC_COLLECTION_ERROR);
+        }
+
+        // collection column
+        DescribeCollectionResponse data = response.getData();
+        CollectionSchema schema = data.getSchema();
+        List<Column> columns = new ArrayList<>();
+        for (FieldSchema fieldSchema : schema.getFieldsList()) {
+            columns.add(MilvusConvertUtils.convertColumn(fieldSchema));
+        }
+
+        // primary key
+        PrimaryKey primaryKey = buildPrimaryKey(schema.getFieldsList());
+
+        // index
+        R<DescribeIndexResponse> describeIndexResponseR =
+                client.describeIndex(
+                        DescribeIndexParam.newBuilder()
+                                .withDatabaseName(database)
+                                .withCollectionName(collection)
+                                .build());
+        if (describeIndexResponseR.getStatus() != R.Status.Success.getCode()) {
+            throw new 
MilvusConnectorException(MilvusConnectionErrorCode.DESC_INDEX_ERROR);
+        }
+        DescribeIndexResponse indexResponse = describeIndexResponseR.getData();
+        List<ConstraintKey.ConstraintKeyColumn> vectorIndexes = 
buildVectorIndexes(indexResponse);
+
+        // build tableSchema
+        TableSchema tableSchema =
+                TableSchema.builder()
+                        .columns(columns)
+                        .primaryKey(primaryKey)
+                        .constraintKey(
+                                ConstraintKey.of(
+                                        
ConstraintKey.ConstraintType.VECTOR_INDEX_KEY,
+                                        "vector_index",
+                                        vectorIndexes))
+                        .build();
+
+        // build tableId
+        TableIdentifier tableId = TableIdentifier.of(CATALOG_NAME, database, 
collection);
+
+        // build options info
+        Map<String, String> options = new HashMap<>();
+        options.put(
+                MilvusOptions.ENABLE_DYNAMIC_FIELD, 
String.valueOf(schema.getEnableDynamicField()));
+
+        return CatalogTable.of(
+                tableId, tableSchema, options, new ArrayList<>(), 
schema.getDescription());
+    }
+
+    private static List<ConstraintKey.ConstraintKeyColumn> buildVectorIndexes(
+            DescribeIndexResponse indexResponse) {
+        if (CollectionUtils.isEmpty(indexResponse.getIndexDescriptionsList())) 
{
+            return null;
+        }
+
+        List<ConstraintKey.ConstraintKeyColumn> list = new ArrayList<>();
+        for (IndexDescription per : indexResponse.getIndexDescriptionsList()) {
+            Map<String, String> paramsMap =
+                    per.getParamsList().stream()
+                            .collect(
+                                    Collectors.toMap(KeyValuePair::getKey, 
KeyValuePair::getValue));
+
+            VectorIndex index =
+                    new VectorIndex(
+                            per.getIndexName(),
+                            per.getFieldName(),
+                            paramsMap.get("index_type"),
+                            paramsMap.get("metric_type"));
+
+            list.add(index);
+        }
+
+        return list;
+    }
+
+    public static PrimaryKey buildPrimaryKey(List<FieldSchema> fields) {
+        for (FieldSchema field : fields) {
+            if (field.getIsPrimaryKey()) {
+                return PrimaryKey.of(
+                        field.getName(), Lists.newArrayList(field.getName()), 
field.getAutoID());
+            }
+        }
+
+        return null;
+    }
+
+    public static PhysicalColumn convertColumn(FieldSchema fieldSchema) {
+        DataType dataType = fieldSchema.getDataType();
+        PhysicalColumn.PhysicalColumnBuilder builder = 
PhysicalColumn.builder();
+        builder.name(fieldSchema.getName());
+        builder.sourceType(dataType.name());
+        builder.comment(fieldSchema.getDescription());
+
+        switch (dataType) {
+            case Bool:
+                builder.dataType(BasicType.BOOLEAN_TYPE);
+                break;
+            case Int8:
+                builder.dataType(BasicType.BYTE_TYPE);
+                break;
+            case Int16:
+                builder.dataType(BasicType.SHORT_TYPE);
+                break;
+            case Int32:
+                builder.dataType(BasicType.INT_TYPE);
+                break;
+            case Int64:
+                builder.dataType(BasicType.LONG_TYPE);
+                break;
+            case Float:
+                builder.dataType(BasicType.FLOAT_TYPE);
+                break;
+            case Double:
+                builder.dataType(BasicType.DOUBLE_TYPE);
+                break;
+            case VarChar:
+                builder.dataType(BasicType.STRING_TYPE);
+                for (KeyValuePair keyValuePair : 
fieldSchema.getTypeParamsList()) {
+                    if (keyValuePair.getKey().equals("max_length")) {
+                        
builder.columnLength(Long.parseLong(keyValuePair.getValue()) * 4);
+                        break;
+                    }
+                }
+                break;
+            case String:
+            case JSON:
+                builder.dataType(BasicType.STRING_TYPE);
+                break;
+            case Array:
+                builder.dataType(ArrayType.STRING_ARRAY_TYPE);
+                break;
+            case FloatVector:
+                builder.dataType(VectorType.VECTOR_FLOAT_TYPE);
+                for (KeyValuePair keyValuePair : 
fieldSchema.getTypeParamsList()) {
+                    if (keyValuePair.getKey().equals("dim")) {
+                        
builder.scale(Integer.valueOf(keyValuePair.getValue()));
+                        break;
+                    }
+                }
+                break;
+            case BinaryVector:
+                builder.dataType(VectorType.VECTOR_BINARY_TYPE);
+                for (KeyValuePair keyValuePair : 
fieldSchema.getTypeParamsList()) {
+                    if (keyValuePair.getKey().equals("dim")) {
+                        
builder.scale(Integer.valueOf(keyValuePair.getValue()));
+                        break;
+                    }
+                }
+                break;
+            case SparseFloatVector:
+                builder.dataType(VectorType.VECTOR_SPARSE_FLOAT_TYPE);
+                break;
+            case Float16Vector:
+                builder.dataType(VectorType.VECTOR_FLOAT16_TYPE);
+                for (KeyValuePair keyValuePair : 
fieldSchema.getTypeParamsList()) {
+                    if (keyValuePair.getKey().equals("dim")) {
+                        
builder.scale(Integer.valueOf(keyValuePair.getValue()));
+                        break;
+                    }
+                }
+                break;
+            case BFloat16Vector:
+                builder.dataType(VectorType.VECTOR_BFLOAT16_TYPE);
+                for (KeyValuePair keyValuePair : 
fieldSchema.getTypeParamsList()) {
+                    if (keyValuePair.getKey().equals("dim")) {
+                        
builder.scale(Integer.valueOf(keyValuePair.getValue()));
+                        break;
+                    }
+                }
+                break;
+            default:
+                throw new UnsupportedOperationException("Unsupported data 
type: " + dataType);
+        }
+
+        return builder.build();
+    }
+
+    public static Object convertBySeaTunnelType(SeaTunnelDataType<?> 
fieldType, Object value) {
+        SqlType sqlType = fieldType.getSqlType();
+        switch (sqlType) {
+            case INT:
+                return Integer.parseInt(value.toString());
+            case BIGINT:
+                return Long.parseLong(value.toString());
+            case SMALLINT:
+                return Short.parseShort(value.toString());
+            case STRING:
+            case DATE:
+                return value.toString();
+            case FLOAT_VECTOR:
+                List<Float> vector = new ArrayList<>();
+                for (Object o : (Object[]) value) {
+                    vector.add(Float.parseFloat(o.toString()));
+                }
+                return vector;
+            case FLOAT:
+                return Float.parseFloat(value.toString());
+            case BOOLEAN:
+                return Boolean.parseBoolean(value.toString());
+            case DOUBLE:
+                return Double.parseDouble(value.toString());
+            case ARRAY:
+                ArrayType<?, ?> arrayType = (ArrayType<?, ?>) fieldType;
+                switch (arrayType.getElementType().getSqlType()) {
+                    case STRING:
+                        String[] stringArray = (String[]) value;
+                        return Arrays.asList(stringArray);
+                    case INT:
+                        Integer[] intArray = (Integer[]) value;
+                        return Arrays.asList(intArray);
+                    case BIGINT:
+                        Long[] longArray = (Long[]) value;
+                        return Arrays.asList(longArray);
+                    case FLOAT:
+                        Float[] floatArray = (Float[]) value;
+                        return Arrays.asList(floatArray);
+                    case DOUBLE:
+                        Double[] doubleArray = (Double[]) value;
+                        return Arrays.asList(doubleArray);
+                }
+            case ROW:
+                SeaTunnelRow row = (SeaTunnelRow) value;
+                return JsonUtils.toJsonString(row.getFields());
+            case MAP:
+                return JacksonUtils.toJsonString(value);
+            default:
+                throw new MilvusConnectorException(
+                        MilvusConnectionErrorCode.NOT_SUPPORT_TYPE, 
sqlType.name());
+        }
+    }
+
+    public static DataType convertSqlTypeToDataType(SqlType sqlType) {
+        switch (sqlType) {
+            case BOOLEAN:
+                return DataType.Bool;
+            case TINYINT:
+                return DataType.Int8;
+            case SMALLINT:
+                return DataType.Int16;
+            case INT:
+                return DataType.Int32;
+            case BIGINT:
+                return DataType.Int64;
+            case FLOAT:
+                return DataType.Float;
+            case DOUBLE:
+                return DataType.Double;
+            case STRING:
+                return DataType.VarChar;
+            case ARRAY:
+                return DataType.Array;
+            case FLOAT_VECTOR:
+                return DataType.FloatVector;
+            case BINARY_VECTOR:
+                return DataType.BinaryVector;
+            case FLOAT16_VECTOR:
+                return DataType.Float16Vector;
+            case BFLOAT16_VECTOR:
+                return DataType.BFloat16Vector;
+            case SPARSE_FLOAT_VECTOR:
+                return DataType.SparseFloatVector;
+            case DATE:
+                return DataType.VarChar;
+            case ROW:
+                return DataType.VarChar;
+        }
+        throw new CatalogException(
+                String.format("Not support convert to milvus type, sqlType is 
%s", sqlType));
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectionErrorCode.java
 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectionErrorCode.java
new file mode 100644
index 0000000000..3acc3de804
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectionErrorCode.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.milvus.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum MilvusConnectionErrorCode implements SeaTunnelErrorCode {
+    SERVER_RESPONSE_FAILED("MILVUS-01", "Milvus server response error"),
+    COLLECTION_NOT_FOUND("MILVUS-02", "Collection not found"),
+    FIELD_NOT_FOUND("MILVUS-03", "Field not found"),
+    DESC_COLLECTION_ERROR("MILVUS-04", "Desc collection error"),
+    SHOW_COLLECTIONS_ERROR("MILVUS-05", "Show collections error"),
+    COLLECTION_NOT_LOADED("MILVUS-06", "Collection not loaded"),
+    NOT_SUPPORT_TYPE("MILVUS-07", "Type not support yet"),
+    DATABASE_NO_COLLECTIONS("MILVUS-08", "Database no any collections"),
+    SOURCE_TABLE_SCHEMA_IS_NULL("MILVUS-09", "Source table schema is null"),
+    FIELD_IS_NULL("MILVUS-10", "Field is null"),
+    CLOSE_CLIENT_ERROR("MILVUS-11", "Close client error"),
+    DESC_INDEX_ERROR("MILVUS-12", "Desc index error"),
+    CREATE_DATABASE_ERROR("MILVUS-13", "Create database error"),
+    CREATE_COLLECTION_ERROR("MILVUS-14", "Create collection error"),
+    CREATE_INDEX_ERROR("MILVUS-15", "Create index error"),
+    ;
+
+    private final String code;
+    private final String description;
+
+    MilvusConnectionErrorCode(String code, String description) {
+        this.code = code;
+        this.description = description;
+    }
+
+    @Override
+    public String getCode() {
+        return code;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectorException.java
 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectorException.java
new file mode 100644
index 0000000000..df6ea7adca
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectorException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.milvus.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class MilvusConnectorException extends SeaTunnelRuntimeException {
+    public MilvusConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
String errorMessage) {
+        super(seaTunnelErrorCode, errorMessage);
+    }
+
+    public MilvusConnectorException(SeaTunnelErrorCode seaTunnelErrorCode) {
+        super(seaTunnelErrorCode, seaTunnelErrorCode.getErrorMessage());
+    }
+
+    public MilvusConnectorException(
+            SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, 
Throwable cause) {
+        super(seaTunnelErrorCode, errorMessage, cause);
+    }
+
+    public MilvusConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
Throwable cause) {
+        super(seaTunnelErrorCode, cause);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java
 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java
new file mode 100644
index 0000000000..c5b1b82bcc
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.milvus.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
+import org.apache.seatunnel.api.sink.SaveModeHandler;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkCommitter;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportSaveMode;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import 
org.apache.seatunnel.connectors.seatunnel.milvus.catalog.MilvusCatalogFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.milvus.state.MilvusAggregatedCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.milvus.state.MilvusCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.milvus.state.MilvusSinkState;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+public class MilvusSink
+        implements SeaTunnelSink<
+                        SeaTunnelRow,
+                        MilvusSinkState,
+                        MilvusCommitInfo,
+                        MilvusAggregatedCommitInfo>,
+                SupportSaveMode {
+
+    private final ReadonlyConfig config;
+    private final CatalogTable catalogTable;
+
+    public MilvusSink(ReadonlyConfig config, CatalogTable catalogTable) {
+        this.config = config;
+        this.catalogTable = catalogTable;
+    }
+
+    @Override
+    public SinkWriter<SeaTunnelRow, MilvusCommitInfo, MilvusSinkState> 
createWriter(
+            SinkWriter.Context context) {
+
+        return new MilvusSinkWriter(context, catalogTable, config, 
Collections.emptyList());
+    }
+
+    @Override
+    public SinkWriter<SeaTunnelRow, MilvusCommitInfo, MilvusSinkState> 
restoreWriter(
+            SinkWriter.Context context, List<MilvusSinkState> states) {
+        return new MilvusSinkWriter(context, catalogTable, config, states);
+    }
+
+    @Override
+    public Optional<Serializer<MilvusSinkState>> getWriterStateSerializer() {
+        return Optional.of(new DefaultSerializer<>());
+    }
+
+    @Override
+    public Optional<SinkCommitter<MilvusCommitInfo>> createCommitter() {
+        return Optional.of(new MilvusSinkCommitter(config));
+    }
+
+    @Override
+    public Optional<Serializer<MilvusCommitInfo>> getCommitInfoSerializer() {
+        return Optional.of(new DefaultSerializer<>());
+    }
+
+    @Override
+    public String getPluginName() {
+        return MilvusSinkConfig.CONNECTOR_IDENTITY;
+    }
+
+    @Override
+    public Optional<SaveModeHandler> getSaveModeHandler() {
+        if (catalogTable == null) {
+            return Optional.empty();
+        }
+
+        CatalogFactory catalogFactory = new MilvusCatalogFactory();
+        Catalog catalog = 
catalogFactory.createCatalog(catalogTable.getCatalogName(), config);
+
+        SchemaSaveMode schemaSaveMode = 
config.get(MilvusSinkConfig.SCHEMA_SAVE_MODE);
+        DataSaveMode dataSaveMode = 
config.get(MilvusSinkConfig.DATA_SAVE_MODE);
+
+        catalog.open();
+        return Optional.of(
+                new DefaultSaveModeHandler(
+                        schemaSaveMode,
+                        dataSaveMode,
+                        catalog,
+                        catalogTable.getTablePath(),
+                        catalogTable,
+                        null));
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkCommitter.java
 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkCommitter.java
new file mode 100644
index 0000000000..8c23bc62e6
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkCommitter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.milvus.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.SinkCommitter;
+import org.apache.seatunnel.connectors.seatunnel.milvus.state.MilvusCommitInfo;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+@Slf4j
+public class MilvusSinkCommitter implements SinkCommitter<MilvusCommitInfo> {
+
+    public MilvusSinkCommitter(ReadonlyConfig pluginConfig) {}
+
+    /**
+     * Commit message to third party data receiver, The method need to achieve 
idempotency.
+     *
+     * @param commitInfos The list of commit message
+     * @return The commit message need retry.
+     * @throws IOException throw IOException when commit failed.
+     */
+    @Override
+    public List<MilvusCommitInfo> commit(List<MilvusCommitInfo> commitInfos) 
throws IOException {
+        return Collections.emptyList();
+    }
+
+    /**
+     * Abort the transaction, this method will be called (**Only** on Spark 
engine) when the commit
+     * is failed.
+     *
+     * @param commitInfos The list of commit message, used to abort the commit.
+     * @throws IOException throw IOException when close failed.
+     */
+    @Override
+    public void abort(List<MilvusCommitInfo> commitInfos) throws IOException {}
+}
diff --git 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java
 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java
new file mode 100644
index 0000000000..6ea5b5a2ff
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.milvus.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.connector.TableSink;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import 
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig;
+
+import org.apache.commons.lang3.StringUtils;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class MilvusSinkFactory implements TableSinkFactory {
+
+    @Override
+    public String factoryIdentifier() {
+        return "Milvus";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+                .required(MilvusSinkConfig.URL, MilvusSinkConfig.TOKEN)
+                .optional(
+                        MilvusSinkConfig.ENABLE_UPSERT,
+                        MilvusSinkConfig.ENABLE_DYNAMIC_FIELD,
+                        MilvusSinkConfig.ENABLE_AUTO_ID,
+                        MilvusSinkConfig.SCHEMA_SAVE_MODE,
+                        MilvusSinkConfig.DATA_SAVE_MODE)
+                .build();
+    }
+
+    public TableSink createSink(TableSinkFactoryContext context) {
+        ReadonlyConfig config = context.getOptions();
+        CatalogTable catalogTable = renameCatalogTable(config, 
context.getCatalogTable());
+        return () -> new MilvusSink(config, catalogTable);
+    }
+
+    private CatalogTable renameCatalogTable(
+            ReadonlyConfig config, CatalogTable sourceCatalogTable) {
+        TableIdentifier sourceTableId = sourceCatalogTable.getTableId();
+        String databaseName;
+        if (StringUtils.isNotEmpty(config.get(MilvusSinkConfig.DATABASE))) {
+            databaseName = config.get(MilvusSinkConfig.DATABASE);
+        } else {
+            databaseName = sourceTableId.getDatabaseName();
+        }
+
+        TableIdentifier newTableId =
+                TableIdentifier.of(
+                        sourceTableId.getCatalogName(),
+                        databaseName,
+                        sourceTableId.getSchemaName(),
+                        sourceTableId.getTableName());
+
+        return CatalogTable.of(newTableId, sourceCatalogTable);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java
 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java
new file mode 100644
index 0000000000..7c823838c5
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.milvus.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.SinkCommitter;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import 
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.milvus.sink.batch.MilvusBatchWriter;
+import 
org.apache.seatunnel.connectors.seatunnel.milvus.sink.batch.MilvusBufferBatchWriter;
+import org.apache.seatunnel.connectors.seatunnel.milvus.state.MilvusCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.milvus.state.MilvusSinkState;
+
+import io.milvus.v2.client.ConnectConfig;
+import io.milvus.v2.client.MilvusClientV2;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig.BATCH_SIZE;
+
+@Slf4j
+/** MilvusSinkWriter is a sink writer that will write {@link SeaTunnelRow} to 
Milvus. */
+public class MilvusSinkWriter
+        implements SinkWriter<SeaTunnelRow, MilvusCommitInfo, MilvusSinkState> 
{
+    private final Context context;
+
+    private final ReadonlyConfig config;
+    private MilvusBatchWriter batchWriter;
+
+    public MilvusSinkWriter(
+            Context context,
+            CatalogTable catalogTable,
+            ReadonlyConfig config,
+            List<MilvusSinkState> milvusSinkStates) {
+        this.context = context;
+        this.config = config;
+        ConnectConfig connectConfig =
+                ConnectConfig.builder()
+                        .uri(config.get(MilvusSinkConfig.URL))
+                        .token(config.get(MilvusSinkConfig.TOKEN))
+                        .build();
+        this.batchWriter =
+                new MilvusBufferBatchWriter(
+                        catalogTable,
+                        config.get(BATCH_SIZE),
+                        
getAutoId(catalogTable.getTableSchema().getPrimaryKey()),
+                        config.get(MilvusSinkConfig.ENABLE_UPSERT),
+                        new MilvusClientV2(connectConfig));
+    }
+
+    /**
+     * write data to third party data receiver.
+     *
+     * @param element the data need be written.
+     * @throws IOException throw IOException when write data failed.
+     */
+    @Override
+    public void write(SeaTunnelRow element) {
+        batchWriter.addToBatch(element);
+        if (batchWriter.needFlush()) {
+            batchWriter.flush();
+        }
+    }
+
+    private Boolean getAutoId(PrimaryKey primaryKey) {
+        if (null != primaryKey && null != primaryKey.getEnableAutoId()) {
+            return primaryKey.getEnableAutoId();
+        } else {
+            return config.get(MilvusSinkConfig.ENABLE_AUTO_ID);
+        }
+    }
+
+    /**
+     * prepare the commit, will be called before {@link #snapshotState(long 
checkpointId)}. If you
+     * need to use 2pc, you can return the commit info in this method, and 
receive the commit info
+     * in {@link SinkCommitter#commit(List)}. If this method failed (by throw 
exception), **Only**
+     * Spark engine will call {@link #abortPrepare()}
+     *
+     * @return the commit info need to commit
+     */
+    @Override
+    public Optional<MilvusCommitInfo> prepareCommit() throws IOException {
+        batchWriter.flush();
+        return Optional.empty();
+    }
+
+    /**
+     * Used to abort the {@link #prepareCommit()}, if the prepareCommit 
failed, there is no
+     * CommitInfoT, so the rollback work cannot be done by {@link 
SinkCommitter}. But we can use
+     * this method to rollback side effects of {@link #prepareCommit()}. Only 
use it in Spark engine
+     * at now.
+     */
+    @Override
+    public void abortPrepare() {}
+
+    /**
+     * call it when SinkWriter close
+     *
+     * @throws IOException if close failed
+     */
+    @Override
+    public void close() throws IOException {
+        if (batchWriter != null) {
+            batchWriter.flush();
+            batchWriter.close();
+        }
+    }
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/batch/MilvusBatchWriter.java
similarity index 70%
copy from 
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
copy to 
seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/batch/MilvusBatchWriter.java
index 838a384809..91e04342dc 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
+++ 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/batch/MilvusBatchWriter.java
@@ -15,26 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.table.type;
+package org.apache.seatunnel.connectors.seatunnel.milvus.sink.batch;
 
-/** The sql type of {@link SeaTunnelDataType}. */
-public enum SqlType {
-    ARRAY,
-    MAP,
-    STRING,
-    BOOLEAN,
-    TINYINT,
-    SMALLINT,
-    INT,
-    BIGINT,
-    FLOAT,
-    DOUBLE,
-    DECIMAL,
-    NULL,
-    BYTES,
-    DATE,
-    TIME,
-    TIMESTAMP,
-    ROW,
-    MULTIPLE_ROW;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+public interface MilvusBatchWriter {
+
+    void addToBatch(SeaTunnelRow element);
+
+    boolean needFlush();
+
+    boolean flush();
+
+    void close();
 }
diff --git 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/batch/MilvusBufferBatchWriter.java
 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/batch/MilvusBufferBatchWriter.java
new file mode 100644
index 0000000000..a323095bc2
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/batch/MilvusBufferBatchWriter.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.milvus.sink.batch;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
+import 
org.apache.seatunnel.connectors.seatunnel.milvus.convert.MilvusConvertUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectionErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException;
+
+import org.apache.commons.collections4.CollectionUtils;
+
+import com.alibaba.fastjson.JSONObject;
+import io.milvus.v2.client.MilvusClientV2;
+import io.milvus.v2.service.vector.request.InsertReq;
+import io.milvus.v2.service.vector.request.UpsertReq;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.seatunnel.api.table.catalog.PrimaryKey.isPrimaryKeyField;
+
+public class MilvusBufferBatchWriter implements MilvusBatchWriter {
+
+    private final int batchSize;
+    private final CatalogTable catalogTable;
+    private final Boolean autoId;
+    private final Boolean enableUpsert;
+    private final String collectionName;
+    private MilvusClientV2 milvusClient;
+
+    private volatile List<JSONObject> milvusDataCache;
+    private volatile int writeCount = 0;
+
+    public MilvusBufferBatchWriter(
+            CatalogTable catalogTable,
+            Integer batchSize,
+            Boolean autoId,
+            Boolean enableUpsert,
+            MilvusClientV2 milvusClient) {
+        this.catalogTable = catalogTable;
+        this.autoId = autoId;
+        this.enableUpsert = enableUpsert;
+        this.milvusClient = milvusClient;
+        this.collectionName = catalogTable.getTablePath().getTableName();
+        this.batchSize = batchSize;
+        this.milvusDataCache = new ArrayList<>(batchSize);
+    }
+
+    @Override
+    public void addToBatch(SeaTunnelRow element) {
+        JSONObject data = buildMilvusData(element);
+        milvusDataCache.add(data);
+        writeCount++;
+    }
+
+    @Override
+    public boolean needFlush() {
+        return this.writeCount >= this.batchSize;
+    }
+
+    @Override
+    public synchronized boolean flush() {
+        if (CollectionUtils.isEmpty(this.milvusDataCache)) {
+            return true;
+        }
+        writeData2Collection();
+        this.milvusDataCache = new ArrayList<>(this.batchSize);
+        this.writeCount = 0;
+        return true;
+    }
+
+    @Override
+    public void close() {
+        try {
+            this.milvusClient.close(10);
+        } catch (InterruptedException e) {
+            throw new SeaTunnelException(e);
+        }
+    }
+
+    private JSONObject buildMilvusData(SeaTunnelRow element) {
+        SeaTunnelRowType seaTunnelRowType = catalogTable.getSeaTunnelRowType();
+        PrimaryKey primaryKey = catalogTable.getTableSchema().getPrimaryKey();
+
+        JSONObject data = new JSONObject();
+        for (int i = 0; i < seaTunnelRowType.getFieldNames().length; i++) {
+            String fieldName = seaTunnelRowType.getFieldNames()[i];
+
+            if (autoId && isPrimaryKeyField(primaryKey, fieldName)) {
+                continue; // if create table open AutoId, then don't need 
insert data with
+                // primaryKey field.
+            }
+
+            SeaTunnelDataType<?> fieldType = seaTunnelRowType.getFieldType(i);
+            Object value = element.getField(i);
+            if (null == value) {
+                throw new MilvusConnectorException(
+                        MilvusConnectionErrorCode.FIELD_IS_NULL, fieldName);
+            }
+            data.put(fieldName, 
MilvusConvertUtils.convertBySeaTunnelType(fieldType, value));
+        }
+        return data;
+    }
+
+    private void writeData2Collection() {
+        // default to use upsertReq, but upsert only works when autoID is 
disabled
+        if (enableUpsert && !autoId) {
+            UpsertReq upsertReq =
+                    UpsertReq.builder()
+                            .collectionName(this.collectionName)
+                            .data(this.milvusDataCache)
+                            .build();
+            milvusClient.upsert(upsertReq);
+        } else {
+            InsertReq insertReq =
+                    InsertReq.builder()
+                            .collectionName(this.collectionName)
+                            .data(this.milvusDataCache)
+                            .build();
+            milvusClient.insert(insertReq);
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSource.java
 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSource.java
new file mode 100644
index 0000000000..05e9aed769
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSource.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.milvus.source;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.source.SupportColumnProjection;
+import org.apache.seatunnel.api.source.SupportParallelism;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import 
org.apache.seatunnel.connectors.seatunnel.milvus.convert.MilvusConvertUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class MilvusSource
+        implements SeaTunnelSource<SeaTunnelRow, MilvusSourceSplit, 
MilvusSourceState>,
+                SupportParallelism,
+                SupportColumnProjection {
+
+    private final ReadonlyConfig config;
+    private final Map<TablePath, CatalogTable> sourceTables;
+
+    public MilvusSource(ReadonlyConfig sourceConfig) {
+        this.config = sourceConfig;
+        this.sourceTables = MilvusConvertUtils.getSourceTables(config);
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.BOUNDED;
+    }
+
+    public List<CatalogTable> getProducedCatalogTables() {
+        return new ArrayList<>(sourceTables.values());
+    }
+
+    @Override
+    public SourceReader<SeaTunnelRow, MilvusSourceSplit> createReader(
+            SourceReader.Context readerContext) throws Exception {
+        return new MilvusSourceReader(readerContext, config, sourceTables);
+    }
+
+    @Override
+    public SourceSplitEnumerator<MilvusSourceSplit, MilvusSourceState> 
createEnumerator(
+            SourceSplitEnumerator.Context<MilvusSourceSplit> context) throws 
Exception {
+        return new MilvusSourceSplitEnumertor(context, config, sourceTables, 
null);
+    }
+
+    @Override
+    public SourceSplitEnumerator<MilvusSourceSplit, MilvusSourceState> 
restoreEnumerator(
+            SourceSplitEnumerator.Context<MilvusSourceSplit> context,
+            MilvusSourceState checkpointState)
+            throws Exception {
+        return new MilvusSourceSplitEnumertor(context, config, sourceTables, 
checkpointState);
+    }
+
+    @Override
+    public String getPluginName() {
+        return "Milvus";
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceFactory.java
 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceFactory.java
new file mode 100644
index 0000000000..d511026a85
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.milvus.source;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.connector.TableSource;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import 
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSourceConfig;
+
+import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.Serializable;
+
+@Slf4j
+@AutoService(Factory.class)
+public class MilvusSourceFactory implements TableSourceFactory {
+
+    @Override
+    public <T, SplitT extends SourceSplit, StateT extends Serializable>
+            TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
+        return () -> (SeaTunnelSource<T, SplitT, StateT>) new 
MilvusSource(context.getOptions());
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+                .required(MilvusSourceConfig.URL, MilvusSourceConfig.TOKEN)
+                .optional(MilvusSourceConfig.DATABASE, 
MilvusSourceConfig.COLLECTION)
+                .build();
+    }
+
+    @Override
+    public Class<? extends SeaTunnelSource> getSourceClass() {
+        return MilvusSource.class;
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return "Milvus";
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceReader.java
 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceReader.java
new file mode 100644
index 0000000000..e52f264264
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceReader.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.milvus.source;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectionErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException;
+
+import org.apache.curator.shaded.com.google.common.collect.Lists;
+
+import io.milvus.client.MilvusServiceClient;
+import io.milvus.grpc.GetLoadStateResponse;
+import io.milvus.grpc.LoadState;
+import io.milvus.orm.iterator.QueryIterator;
+import io.milvus.param.ConnectParam;
+import io.milvus.param.R;
+import io.milvus.param.collection.GetLoadStateParam;
+import io.milvus.param.dml.QueryIteratorParam;
+import io.milvus.response.QueryResultsWrapper;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
+@Slf4j
+public class MilvusSourceReader implements SourceReader<SeaTunnelRow, 
MilvusSourceSplit> {
+
+    private final Deque<MilvusSourceSplit> pendingSplits = new 
ConcurrentLinkedDeque<>();
+    private final ReadonlyConfig config;
+    private final Context context;
+    private Map<TablePath, CatalogTable> sourceTables;
+
+    private MilvusServiceClient client;
+
+    private volatile boolean noMoreSplit;
+
+    public MilvusSourceReader(
+            Context readerContext,
+            ReadonlyConfig config,
+            Map<TablePath, CatalogTable> sourceTables) {
+        this.context = readerContext;
+        this.config = config;
+        this.sourceTables = sourceTables;
+    }
+
+    @Override
+    public void open() throws Exception {
+        client =
+                new MilvusServiceClient(
+                        ConnectParam.newBuilder()
+                                .withUri(config.get(MilvusSourceConfig.URL))
+                                
.withToken(config.get(MilvusSourceConfig.TOKEN))
+                                .build());
+    }
+
+    @Override
+    public void close() throws IOException {
+        client.close();
+    }
+
+    @Override
+    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+        synchronized (output.getCheckpointLock()) {
+            MilvusSourceSplit split = pendingSplits.poll();
+            if (null != split) {
+                handleEveryRowInternal(split, output);
+            } else {
+                if (!noMoreSplit) {
+                    log.info("Milvus source wait split!");
+                }
+            }
+        }
+        if (noMoreSplit
+                && pendingSplits.isEmpty()
+                && Boundedness.BOUNDED.equals(context.getBoundedness())) {
+            // signal to the source that we have reached the end of the data.
+            log.info("Closed the bounded milvus source");
+            context.signalNoMoreElement();
+        }
+        Thread.sleep(1000L);
+    }
+
+    private void handleEveryRowInternal(MilvusSourceSplit split, 
Collector<SeaTunnelRow> output) {
+        TablePath tablePath = split.getTablePath();
+        TableSchema tableSchema = sourceTables.get(tablePath).getTableSchema();
+        if (null == tableSchema) {
+            throw new MilvusConnectorException(
+                    MilvusConnectionErrorCode.SOURCE_TABLE_SCHEMA_IS_NULL);
+        }
+
+        R<GetLoadStateResponse> loadStateResponse =
+                client.getLoadState(
+                        GetLoadStateParam.newBuilder()
+                                .withDatabaseName(tablePath.getDatabaseName())
+                                .withCollectionName(tablePath.getTableName())
+                                .build());
+        if (loadStateResponse.getStatus() != R.Status.Success.getCode()) {
+            throw new MilvusConnectorException(
+                    MilvusConnectionErrorCode.SERVER_RESPONSE_FAILED,
+                    loadStateResponse.getException());
+        }
+
+        if 
(!LoadState.LoadStateLoaded.equals(loadStateResponse.getData().getState())) {
+            throw new 
MilvusConnectorException(MilvusConnectionErrorCode.COLLECTION_NOT_LOADED);
+        }
+
+        QueryIteratorParam param =
+                QueryIteratorParam.newBuilder()
+                        .withDatabaseName(tablePath.getDatabaseName())
+                        .withCollectionName(tablePath.getTableName())
+                        .withOutFields(Lists.newArrayList("*"))
+                        .build();
+
+        R<QueryIterator> response = client.queryIterator(param);
+        if (response.getStatus() != R.Status.Success.getCode()) {
+            throw new MilvusConnectorException(
+                    MilvusConnectionErrorCode.SERVER_RESPONSE_FAILED,
+                    loadStateResponse.getException());
+        }
+
+        QueryIterator iterator = response.getData();
+        while (true) {
+            List<QueryResultsWrapper.RowRecord> next = iterator.next();
+            if (next == null || next.isEmpty()) {
+                break;
+            } else {
+                for (QueryResultsWrapper.RowRecord record : next) {
+                    SeaTunnelRow seaTunnelRow =
+                            convertToSeaTunnelRow(record, tableSchema, 
tablePath);
+                    output.collect(seaTunnelRow);
+                }
+            }
+        }
+    }
+
+    public SeaTunnelRow convertToSeaTunnelRow(
+            QueryResultsWrapper.RowRecord record, TableSchema tableSchema, 
TablePath tablePath) {
+        SeaTunnelRowType typeInfo = tableSchema.toPhysicalRowDataType();
+        Object[] fields = new Object[record.getFieldValues().size()];
+        Map<String, Object> fieldValuesMap = record.getFieldValues();
+        String[] fieldNames = typeInfo.getFieldNames();
+        for (int fieldIndex = 0; fieldIndex < typeInfo.getTotalFields(); 
fieldIndex++) {
+            SeaTunnelDataType<?> seaTunnelDataType = 
typeInfo.getFieldType(fieldIndex);
+            Object filedValues = fieldValuesMap.get(fieldNames[fieldIndex]);
+            switch (seaTunnelDataType.getSqlType()) {
+                case STRING:
+                    fields[fieldIndex] = filedValues.toString();
+                    break;
+                case BOOLEAN:
+                    if (filedValues instanceof Boolean) {
+                        fields[fieldIndex] = filedValues;
+                    } else {
+                        fields[fieldIndex] = 
Boolean.valueOf(filedValues.toString());
+                    }
+                    break;
+                case INT:
+                    if (filedValues instanceof Integer) {
+                        fields[fieldIndex] = filedValues;
+                    } else {
+                        fields[fieldIndex] = 
Integer.valueOf(filedValues.toString());
+                    }
+                    break;
+                case BIGINT:
+                    if (filedValues instanceof Long) {
+                        fields[fieldIndex] = filedValues;
+                    } else {
+                        fields[fieldIndex] = 
Long.parseLong(filedValues.toString());
+                    }
+                    break;
+                case FLOAT:
+                    if (filedValues instanceof Float) {
+                        fields[fieldIndex] = filedValues;
+                    } else {
+                        fields[fieldIndex] = 
Float.parseFloat(filedValues.toString());
+                    }
+                    break;
+                case DOUBLE:
+                    if (filedValues instanceof Double) {
+                        fields[fieldIndex] = filedValues;
+                    } else {
+                        fields[fieldIndex] = 
Double.parseDouble(filedValues.toString());
+                    }
+                    break;
+                case FLOAT_VECTOR:
+                    if (filedValues instanceof List) {
+                        List list = (List) filedValues;
+                        Float[] arrays = new Float[list.size()];
+                        for (int i = 0; i < list.size(); i++) {
+                            arrays[i] = 
Float.parseFloat(list.get(i).toString());
+                        }
+                        fields[fieldIndex] = arrays;
+                        break;
+                    } else {
+                        throw new MilvusConnectorException(
+                                CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+                                "Unexpected vector value: " + filedValues);
+                    }
+                default:
+                    throw new MilvusConnectorException(
+                            CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+                            "Unexpected value: " + 
seaTunnelDataType.getSqlType().name());
+            }
+        }
+
+        SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields);
+        seaTunnelRow.setTableId(tablePath.getFullName());
+        seaTunnelRow.setRowKind(RowKind.INSERT);
+        return seaTunnelRow;
+    }
+
+    @Override
+    public List<MilvusSourceSplit> snapshotState(long checkpointId) throws 
Exception {
+        return new ArrayList<>(pendingSplits);
+    }
+
+    @Override
+    public void addSplits(List<MilvusSourceSplit> splits) {
+        log.info("Adding milvus splits to reader: {}", splits);
+        pendingSplits.addAll(splits);
+    }
+
+    @Override
+    public void handleNoMoreSplits() {
+        log.info("receive no more splits message, this milvus reader will not 
add new split.");
+        noMoreSplit = true;
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {}
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplit.java
similarity index 65%
copy from 
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
copy to 
seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplit.java
index 838a384809..e79d74b6dc 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
+++ 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplit.java
@@ -15,26 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.table.type;
+package org.apache.seatunnel.connectors.seatunnel.milvus.source;
 
-/** The sql type of {@link SeaTunnelDataType}. */
-public enum SqlType {
-    ARRAY,
-    MAP,
-    STRING,
-    BOOLEAN,
-    TINYINT,
-    SMALLINT,
-    INT,
-    BIGINT,
-    FLOAT,
-    DOUBLE,
-    DECIMAL,
-    NULL,
-    BYTES,
-    DATE,
-    TIME,
-    TIMESTAMP,
-    ROW,
-    MULTIPLE_ROW;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+
+import lombok.Builder;
+import lombok.Data;
+
+@Data
+@Builder
+public class MilvusSourceSplit implements SourceSplit {
+
+    private TablePath tablePath;
+    private String splitId;
+
+    @Override
+    public String splitId() {
+        return splitId;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplitEnumertor.java
 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplitEnumertor.java
new file mode 100644
index 0000000000..e01e9c8ad5
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplitEnumertor.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.milvus.source;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import 
org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+@Slf4j
+public class MilvusSourceSplitEnumertor
+        implements SourceSplitEnumerator<MilvusSourceSplit, MilvusSourceState> 
{
+
+    private final Map<TablePath, CatalogTable> tables;
+    private final Context<MilvusSourceSplit> context;
+    private final ConcurrentLinkedQueue<TablePath> pendingTables;
+    private final Map<Integer, List<MilvusSourceSplit>> pendingSplits;
+    private final Object stateLock = new Object();
+
+    private ReadonlyConfig config;
+
+    public MilvusSourceSplitEnumertor(
+            Context<MilvusSourceSplit> context,
+            ReadonlyConfig config,
+            Map<TablePath, CatalogTable> sourceTables,
+            MilvusSourceState sourceState) {
+        this.context = context;
+        this.tables = sourceTables;
+        this.config = config;
+        if (sourceState == null) {
+            this.pendingTables = new ConcurrentLinkedQueue<>(tables.keySet());
+            this.pendingSplits = new HashMap<>();
+        } else {
+            this.pendingTables = new 
ConcurrentLinkedQueue<>(sourceState.getPendingTables());
+            this.pendingSplits = new HashMap<>(sourceState.getPendingSplits());
+        }
+    }
+
+    @Override
+    public void open() {}
+
+    @Override
+    public void run() throws Exception {
+        log.info("Starting milvus split enumerator.");
+        Set<Integer> readers = context.registeredReaders();
+        while (!pendingTables.isEmpty()) {
+            synchronized (stateLock) {
+                TablePath tablePath = pendingTables.poll();
+                log.info("begin to split table path: {}", tablePath);
+                Collection<MilvusSourceSplit> splits = 
generateSplits(tables.get(tablePath));
+                log.info("end to split table {} into {} splits.", tablePath, 
splits.size());
+
+                addPendingSplit(splits);
+            }
+
+            synchronized (stateLock) {
+                assignSplit(readers);
+            }
+        }
+
+        log.info("No more splits to assign." + " Sending NoMoreSplitsEvent to 
reader {}.", readers);
+        readers.forEach(context::signalNoMoreSplits);
+    }
+
+    private Collection<MilvusSourceSplit> generateSplits(CatalogTable table) {
+        log.info("Start splitting table {} into chunks...", 
table.getTablePath());
+        MilvusSourceSplit milvusSourceSplit =
+                MilvusSourceSplit.builder()
+                        .splitId(createSplitId(table.getTablePath(), 0))
+                        .tablePath(table.getTablePath())
+                        .build();
+
+        return Collections.singletonList(milvusSourceSplit);
+    }
+
+    protected String createSplitId(TablePath tablePath, int index) {
+        return String.format("%s-%s", tablePath, index);
+    }
+
+    private void addPendingSplit(Collection<MilvusSourceSplit> splits) {
+        int readerCount = context.currentParallelism();
+        for (MilvusSourceSplit split : splits) {
+            int ownerReader = getSplitOwner(split.splitId(), readerCount);
+            log.info("Assigning {} to {} reader.", split, ownerReader);
+
+            pendingSplits.computeIfAbsent(ownerReader, r -> new 
ArrayList<>()).add(split);
+        }
+    }
+
+    private static int getSplitOwner(String tp, int numReaders) {
+        return (tp.hashCode() & Integer.MAX_VALUE) % numReaders;
+    }
+
+    private void assignSplit(Collection<Integer> readers) {
+        log.info("Assign pendingSplits to readers {}", readers);
+
+        for (int reader : readers) {
+            List<MilvusSourceSplit> assignmentForReader = 
pendingSplits.remove(reader);
+            if (assignmentForReader != null && !assignmentForReader.isEmpty()) 
{
+                log.debug("Assign splits {} to reader {}", 
assignmentForReader, reader);
+                context.assignSplit(reader, assignmentForReader);
+            }
+        }
+    }
+
+    @Override
+    public void close() throws IOException {}
+
+    @Override
+    public void addSplitsBack(List<MilvusSourceSplit> splits, int subtaskId) {
+        if (!splits.isEmpty()) {
+            synchronized (stateLock) {
+                addPendingSplit(splits, subtaskId);
+                if (context.registeredReaders().contains(subtaskId)) {
+                    assignSplit(Collections.singletonList(subtaskId));
+                } else {
+                    log.warn(
+                            "Reader {} is not registered. Pending splits {} 
are not assigned.",
+                            subtaskId,
+                            splits);
+                }
+            }
+        }
+        log.info("Add back splits {} to JdbcSourceSplitEnumerator.", 
splits.size());
+    }
+
+    private void addPendingSplit(Collection<MilvusSourceSplit> splits, int 
ownerReader) {
+        pendingSplits.computeIfAbsent(ownerReader, r -> new 
ArrayList<>()).addAll(splits);
+    }
+
+    @Override
+    public int currentUnassignedSplitSize() {
+        return pendingTables.isEmpty() && pendingSplits.isEmpty() ? 0 : 1;
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId) {
+        throw new MilvusConnectorException(
+                CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
+                String.format("Unsupported handleSplitRequest: %d", 
subtaskId));
+    }
+
+    @Override
+    public void registerReader(int subtaskId) {
+        log.info("Register reader {} to MilvusSourceSplitEnumerator.", 
subtaskId);
+        if (!pendingSplits.isEmpty()) {
+            synchronized (stateLock) {
+                assignSplit(Collections.singletonList(subtaskId));
+            }
+        }
+    }
+
+    @Override
+    public MilvusSourceState snapshotState(long checkpointId) throws Exception 
{
+        synchronized (stateLock) {
+            return new MilvusSourceState(
+                    new ArrayList(pendingTables), new 
HashMap<>(pendingSplits));
+        }
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {}
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceState.java
similarity index 64%
copy from 
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
copy to 
seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceState.java
index 838a384809..7b6c2e0672 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
+++ 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceState.java
@@ -15,26 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.table.type;
+package org.apache.seatunnel.connectors.seatunnel.milvus.source;
 
-/** The sql type of {@link SeaTunnelDataType}. */
-public enum SqlType {
-    ARRAY,
-    MAP,
-    STRING,
-    BOOLEAN,
-    TINYINT,
-    SMALLINT,
-    INT,
-    BIGINT,
-    FLOAT,
-    DOUBLE,
-    DECIMAL,
-    NULL,
-    BYTES,
-    DATE,
-    TIME,
-    TIMESTAMP,
-    ROW,
-    MULTIPLE_ROW;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+@Data
+@AllArgsConstructor
+public class MilvusSourceState implements Serializable {
+    private List<TablePath> pendingTables;
+    private Map<Integer, List<MilvusSourceSplit>> pendingSplits;
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/state/MilvusAggregatedCommitInfo.java
similarity index 70%
copy from 
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
copy to 
seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/state/MilvusAggregatedCommitInfo.java
index 838a384809..d4bc422d9b 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
+++ 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/state/MilvusAggregatedCommitInfo.java
@@ -15,26 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.table.type;
+package org.apache.seatunnel.connectors.seatunnel.milvus.state;
 
-/** The sql type of {@link SeaTunnelDataType}. */
-public enum SqlType {
-    ARRAY,
-    MAP,
-    STRING,
-    BOOLEAN,
-    TINYINT,
-    SMALLINT,
-    INT,
-    BIGINT,
-    FLOAT,
-    DOUBLE,
-    DECIMAL,
-    NULL,
-    BYTES,
-    DATE,
-    TIME,
-    TIMESTAMP,
-    ROW,
-    MULTIPLE_ROW;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+@AllArgsConstructor
+public class MilvusAggregatedCommitInfo implements Serializable {
+    List<MilvusCommitInfo> commitInfos;
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/state/MilvusCommitInfo.java
similarity index 70%
copy from 
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
copy to 
seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/state/MilvusCommitInfo.java
index 838a384809..f6887ffa06 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
+++ 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/state/MilvusCommitInfo.java
@@ -15,26 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.table.type;
+package org.apache.seatunnel.connectors.seatunnel.milvus.state;
 
-/** The sql type of {@link SeaTunnelDataType}. */
-public enum SqlType {
-    ARRAY,
-    MAP,
-    STRING,
-    BOOLEAN,
-    TINYINT,
-    SMALLINT,
-    INT,
-    BIGINT,
-    FLOAT,
-    DOUBLE,
-    DECIMAL,
-    NULL,
-    BYTES,
-    DATE,
-    TIME,
-    TIMESTAMP,
-    ROW,
-    MULTIPLE_ROW;
-}
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+@AllArgsConstructor
+public class MilvusCommitInfo implements Serializable {}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/state/MilvusSinkState.java
similarity index 70%
copy from 
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
copy to 
seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/state/MilvusSinkState.java
index 838a384809..3d8ff62b1d 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
+++ 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/state/MilvusSinkState.java
@@ -15,26 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.table.type;
+package org.apache.seatunnel.connectors.seatunnel.milvus.state;
 
-/** The sql type of {@link SeaTunnelDataType}. */
-public enum SqlType {
-    ARRAY,
-    MAP,
-    STRING,
-    BOOLEAN,
-    TINYINT,
-    SMALLINT,
-    INT,
-    BIGINT,
-    FLOAT,
-    DOUBLE,
-    DECIMAL,
-    NULL,
-    BYTES,
-    DATE,
-    TIME,
-    TIMESTAMP,
-    ROW,
-    MULTIPLE_ROW;
-}
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.experimental.SuperBuilder;
+
+import java.io.Serializable;
+
+@Data
+@SuperBuilder
+@AllArgsConstructor
+public class MilvusSinkState implements Serializable {}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index 0498ff4539..68274736f0 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -77,6 +77,7 @@
         <module>connector-paimon</module>
         <module>connector-easysearch</module>
         <module>connector-web3j</module>
+        <module>connector-milvus</module>
     </modules>
 
     <dependencyManagement>
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index 37f1cbebf4..a5dd203f83 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -576,6 +576,13 @@
                     <scope>provided</scope>
                 </dependency>
 
+                <dependency>
+                    <groupId>org.apache.seatunnel</groupId>
+                    <artifactId>connector-milvus</artifactId>
+                    <version>${project.version}</version>
+                    <scope>provided</scope>
+                </dependency>
+
                 <!-- jdbc driver -->
                 <dependency>
                     <groupId>com.aliyun.phoenix</groupId>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/pom.xml
new file mode 100644
index 0000000000..2175811c6c
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/pom.xml
@@ -0,0 +1,66 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-connector-v2-e2e</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>connector-milvus-e2e</artifactId>
+    <name>SeaTunnel : E2E : Connector V2 : Milvus</name>
+
+    <properties>
+        <testcontainer.milvus.version>1.19.8</testcontainer.milvus.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-milvus</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.code.gson</groupId>
+            <artifactId>gson</artifactId>
+            <version>2.8.9</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>milvus</artifactId>
+            <version>${testcontainer.milvus.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-assert</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-fake</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java
new file mode 100644
index 0000000000..5356433057
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.v2.milvus;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.milvus.MilvusContainer;
+
+import com.alibaba.fastjson.JSONObject;
+import io.milvus.client.MilvusServiceClient;
+import io.milvus.grpc.DataType;
+import io.milvus.grpc.DescribeCollectionResponse;
+import io.milvus.grpc.FieldSchema;
+import io.milvus.grpc.MutationResult;
+import io.milvus.param.ConnectParam;
+import io.milvus.param.IndexType;
+import io.milvus.param.MetricType;
+import io.milvus.param.R;
+import io.milvus.param.RpcStatus;
+import io.milvus.param.collection.CreateCollectionParam;
+import io.milvus.param.collection.DescribeCollectionParam;
+import io.milvus.param.collection.FieldType;
+import io.milvus.param.collection.HasCollectionParam;
+import io.milvus.param.collection.LoadCollectionParam;
+import io.milvus.param.dml.InsertParam;
+import io.milvus.param.index.CreateIndexParam;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+@Slf4j
+@DisabledOnContainer(
+        value = {},
+        type = {EngineType.SPARK, EngineType.FLINK},
+        disabledReason = "Currently SPARK and FLINK not support adapt")
+public class MilvusIT extends TestSuiteBase implements TestResource {
+
+    private static final String HOST = "milvus-e2e";
+    private static final String MILVUS_IMAGE = 
"milvusdb/milvus:2.4-20240711-7e2a9d6b";
+    private static final String TOKEN = "root:Milvus";
+    private MilvusContainer container;
+    private MilvusServiceClient milvusClient;
+    private static final String COLLECTION_NAME = "simple_example";
+    private static final String ID_FIELD = "book_id";
+    private static final String VECTOR_FIELD = "book_intro";
+    private static final String TITLE_FIELD = "book_title";
+    private static final Integer VECTOR_DIM = 4;
+
+    @BeforeAll
+    @Override
+    public void startUp() throws Exception {
+        this.container =
+                new 
MilvusContainer(MILVUS_IMAGE).withNetwork(NETWORK).withNetworkAliases(HOST);
+        Startables.deepStart(Stream.of(this.container)).join();
+        log.info("Milvus host is {}", container.getHost());
+        log.info("Milvus container started");
+        Awaitility.given().ignoreExceptions().await().atMost(720L, 
TimeUnit.SECONDS);
+        this.initMilvus();
+        this.initSourceData();
+    }
+
+    private void initMilvus()
+            throws SQLException, ClassNotFoundException, 
InstantiationException,
+                    IllegalAccessException {
+        milvusClient =
+                new MilvusServiceClient(
+                        ConnectParam.newBuilder()
+                                .withUri(this.container.getEndpoint())
+                                .withToken(TOKEN)
+                                .build());
+    }
+
+    private void initSourceData() {
+        // Define fields
+        List<FieldType> fieldsSchema =
+                Arrays.asList(
+                        FieldType.newBuilder()
+                                .withName(ID_FIELD)
+                                .withDataType(DataType.Int64)
+                                .withPrimaryKey(true)
+                                .withAutoID(false)
+                                .build(),
+                        FieldType.newBuilder()
+                                .withName(VECTOR_FIELD)
+                                .withDataType(DataType.FloatVector)
+                                .withDimension(VECTOR_DIM)
+                                .build(),
+                        FieldType.newBuilder()
+                                .withName(TITLE_FIELD)
+                                .withDataType(DataType.VarChar)
+                                .withMaxLength(64)
+                                .build());
+
+        // Create the collection with 3 fields
+        R<RpcStatus> ret =
+                milvusClient.createCollection(
+                        CreateCollectionParam.newBuilder()
+                                .withCollectionName(COLLECTION_NAME)
+                                .withFieldTypes(fieldsSchema)
+                                .build());
+        if (ret.getStatus() != R.Status.Success.getCode()) {
+            throw new RuntimeException("Failed to create collection! Error: " 
+ ret.getMessage());
+        }
+
+        // Specify an index type on the vector field.
+        ret =
+                milvusClient.createIndex(
+                        CreateIndexParam.newBuilder()
+                                .withCollectionName(COLLECTION_NAME)
+                                .withFieldName(VECTOR_FIELD)
+                                .withIndexType(IndexType.FLAT)
+                                .withMetricType(MetricType.L2)
+                                .build());
+        if (ret.getStatus() != R.Status.Success.getCode()) {
+            throw new RuntimeException(
+                    "Failed to create index on vector field! Error: " + 
ret.getMessage());
+        }
+
+        // Call loadCollection() to enable automatically loading data into 
memory for searching
+        milvusClient.loadCollection(
+                
LoadCollectionParam.newBuilder().withCollectionName(COLLECTION_NAME).build());
+
+        log.info("Collection created");
+
+        // Insert 10 records into the collection
+        List<JSONObject> rows = new ArrayList<>();
+        for (long i = 1L; i <= 10; ++i) {
+            JSONObject row = new JSONObject();
+            row.put(ID_FIELD, i);
+            List<Float> vector = Arrays.asList((float) i, (float) i, (float) 
i, (float) i);
+            row.put(VECTOR_FIELD, vector);
+            row.put(TITLE_FIELD, "Tom and Jerry " + i);
+            rows.add(row);
+        }
+
+        R<MutationResult> insertRet =
+                milvusClient.insert(
+                        InsertParam.newBuilder()
+                                .withCollectionName(COLLECTION_NAME)
+                                .withRows(rows)
+                                .build());
+        if (insertRet.getStatus() != R.Status.Success.getCode()) {
+            throw new RuntimeException("Failed to insert! Error: " + 
insertRet.getMessage());
+        }
+    }
+
+    @AfterAll
+    @Override
+    public void tearDown() throws Exception {
+        this.milvusClient.close();
+        this.container.close();
+    }
+
+    @TestTemplate
+    public void testMilvus(TestContainer container) throws IOException, 
InterruptedException {
+        Container.ExecResult execResult = 
container.executeJob("/milvus-to-milvus.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        // assert table exist
+        R<Boolean> hasCollectionResponse =
+                this.milvusClient.hasCollection(
+                        HasCollectionParam.newBuilder()
+                                .withDatabaseName("test")
+                                .withCollectionName(COLLECTION_NAME)
+                                .build());
+        Assertions.assertTrue(hasCollectionResponse.getData());
+
+        // check table fields
+        R<DescribeCollectionResponse> describeCollectionResponseR =
+                this.milvusClient.describeCollection(
+                        DescribeCollectionParam.newBuilder()
+                                .withDatabaseName("test")
+                                .withCollectionName(COLLECTION_NAME)
+                                .build());
+
+        DescribeCollectionResponse data = 
describeCollectionResponseR.getData();
+        List<String> fileds =
+                data.getSchema().getFieldsList().stream()
+                        .map(FieldSchema::getName)
+                        .collect(Collectors.toList());
+        Assertions.assertTrue(fileds.contains(ID_FIELD));
+        Assertions.assertTrue(fileds.contains(VECTOR_FIELD));
+        Assertions.assertTrue(fileds.contains(TITLE_FIELD));
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/resources/milvus-to-milvus.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/resources/milvus-to-milvus.conf
new file mode 100644
index 0000000000..5b5b9aec78
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/resources/milvus-to-milvus.conf
@@ -0,0 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Milvus {
+    url = "http://milvus-e2e:19530";
+    token = "root:Milvus"
+  }
+}
+
+sink {
+   Milvus {
+     url = "http://milvus-e2e:19530";
+     token = "root:Milvus"
+     database="test"
+   }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index 47864f21c6..0a0f909e19 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -74,6 +74,7 @@
         <module>connector-cdc-oracle-e2e</module>
         <module>connector-hive-e2e</module>
         <module>connector-hudi-e2e</module>
+        <module>connector-milvus-e2e</module>
     </modules>
 
     <dependencies>

Reply via email to