This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 582f9c7184 [Fix][Connect-V2][Milvus] Fix partition creation (#10589)
582f9c7184 is described below

commit 582f9c7184123644a54710b0f1e4a4f1cbef06a5
Author: 许晓峰 <[email protected]>
AuthorDate: Tue Mar 17 21:41:45 2026 +0800

    [Fix][Connect-V2][Milvus] Fix partition creation (#10589)
---
 .../seatunnel/milvus/catalog/MilvusCatalog.java    |  49 ++++++-
 .../seatunnel/milvus/utils/MilvusConvertUtils.java |   2 +-
 .../milvus/catalog/MilvusCatalogTest.java          | 150 +++++++++++++++++++++
 .../milvus/utils/MilvusConvertUtilsTest.java       | 119 ++++++++++++++++
 .../e2e/connector/v2/milvus/MilvusIT.java          | 128 +++++++++++++++++-
 .../milvus-to-milvus-with-partitions.conf          |  38 ++++++
 6 files changed, 481 insertions(+), 5 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusCatalog.java
 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusCatalog.java
index 68a15e5a5e..6343bf253b 100644
--- 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusCatalog.java
@@ -198,6 +198,10 @@ public class MilvusCatalog implements Catalog {
         checkNotNull(catalogTable, "catalogTable must not be null");
         TableSchema tableSchema = catalogTable.getTableSchema();
         checkNotNull(tableSchema, "tableSchema must not be null");
+        log.info(
+                "Start creating Milvus collection. database={}, collection={}",
+                tablePath.getDatabaseName(),
+                tablePath.getTableName());
         createTableInternal(tablePath, catalogTable);
 
         if (CollectionUtils.isNotEmpty(tableSchema.getConstraintKeys())
@@ -206,10 +210,19 @@ public class MilvusCatalog implements Catalog {
                 if (constraintKey
                         .getConstraintType()
                         
.equals(ConstraintKey.ConstraintType.VECTOR_INDEX_KEY)) {
+                    log.info(
+                            "Creating Milvus vector indexes. database={}, 
collection={}, constraintName={}",
+                            tablePath.getDatabaseName(),
+                            tablePath.getTableName(),
+                            constraintKey.getConstraintName());
                     createIndexInternal(tablePath, 
constraintKey.getColumnNames());
                 }
             }
         }
+        log.info(
+                "Finished creating Milvus collection. database={}, 
collection={}",
+                tablePath.getDatabaseName(),
+                tablePath.getTableName());
     }
 
     private void createIndexInternal(
@@ -294,15 +307,26 @@ public class MilvusCatalog implements Catalog {
             }
 
             CreateCollectionParam createCollectionParam = builder.build();
+            log.info(
+                    "Creating Milvus collection metadata. database={}, 
collection={}",
+                    tablePath.getDatabaseName(),
+                    tablePath.getTableName());
             R<RpcStatus> response = 
this.client.createCollection(createCollectionParam);
             if (!Objects.equals(response.getStatus(), 
R.success().getStatus())) {
                 throw new MilvusConnectorException(
                         MilvusConnectionErrorCode.CREATE_COLLECTION_ERROR, 
response.getMessage());
             }
 
-            // not exist partition key field, will read show partitions to 
create
-            if (!existPartitionKeyField && 
options.containsKey(MilvusOptions.PARTITION_KEY_FIELD)) {
-                
createPartitionInternal(options.get(MilvusOptions.PARTITION_KEY_FIELD), 
tablePath);
+            // When collection does not have a partition key field,
+            // create partitions from the 'partitionNames' option
+            String partitionNames = options.get(MilvusOptions.PARTITION_NAMES);
+            if (!existPartitionKeyField && 
StringUtils.isNotBlank(partitionNames)) {
+                log.info(
+                        "Creating Milvus partitions. database={}, 
collection={}, partitionNames={}",
+                        tablePath.getDatabaseName(),
+                        tablePath.getTableName(),
+                        partitionNames);
+                createPartitionInternal(partitionNames, tablePath);
             }
 
         } catch (Exception e) {
@@ -329,9 +353,28 @@ public class MilvusCatalog implements Catalog {
         // start to loop create partition
         String[] partitionNameArray = partitionNames.split(",");
         for (String partitionName : partitionNameArray) {
+            partitionName = partitionName.trim();
+            if (StringUtils.isBlank(partitionName) || 
"_default".equals(partitionName)) {
+                log.info(
+                        "Skip Milvus partition creation. database={}, 
collection={}, partitionName={}",
+                        tablePath.getDatabaseName(),
+                        tablePath.getTableName(),
+                        partitionName);
+                continue;
+            }
             if (existPartitionNames.contains(partitionName)) {
+                log.info(
+                        "Milvus partition already exists. database={}, 
collection={}, partitionName={}",
+                        tablePath.getDatabaseName(),
+                        tablePath.getTableName(),
+                        partitionName);
                 continue;
             }
+            log.info(
+                    "Creating Milvus partition. database={}, collection={}, 
partitionName={}",
+                    tablePath.getDatabaseName(),
+                    tablePath.getTableName(),
+                    partitionName);
             R<RpcStatus> response =
                     this.client.createPartition(
                             CreatePartitionParam.newBuilder()
diff --git 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/MilvusConvertUtils.java
 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/MilvusConvertUtils.java
index 84ba5168e0..9e985e1b66 100644
--- 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/MilvusConvertUtils.java
+++ 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/MilvusConvertUtils.java
@@ -252,7 +252,7 @@ public class MilvusConvertUtils {
             }
             list.add(partition);
         }
-        if (CollectionUtils.isEmpty(partitionNamesList)) {
+        if (CollectionUtils.isEmpty(list)) {
             return;
         }
 
diff --git 
a/seatunnel-connectors-v2/connector-milvus/src/test/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusCatalogTest.java
 
b/seatunnel-connectors-v2/connector-milvus/src/test/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusCatalogTest.java
new file mode 100644
index 0000000000..01eb93746f
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-milvus/src/test/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusCatalogTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.milvus.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import io.milvus.client.MilvusServiceClient;
+import io.milvus.grpc.ShowPartitionsResponse;
+import io.milvus.param.R;
+import io.milvus.param.RpcStatus;
+import io.milvus.param.partition.CreatePartitionParam;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.Collections;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+class MilvusCatalogTest {
+
+    @Test
+    void createPartitionInternalSkipsEmptyString() throws Exception {
+        MilvusCatalog catalog = 
createCatalogWithClient(mockClientWithDefaultPartitions());
+        invokeCreatePartitionInternal(catalog, "", TablePath.of("db", null, 
"coll"));
+        verify(getClient(catalog), never()).createPartition(any());
+    }
+
+    @Test
+    void createPartitionInternalSkipsOnlyCommas() throws Exception {
+        MilvusCatalog catalog = 
createCatalogWithClient(mockClientWithDefaultPartitions());
+        invokeCreatePartitionInternal(catalog, ",,,", TablePath.of("db", null, 
"coll"));
+        verify(getClient(catalog), never()).createPartition(any());
+    }
+
+    @Test
+    void createPartitionInternalSkipsSpaces() throws Exception {
+        MilvusCatalog catalog = 
createCatalogWithClient(mockClientWithDefaultPartitions());
+        invokeCreatePartitionInternal(catalog, "   ", TablePath.of("db", null, 
"coll"));
+        verify(getClient(catalog), never()).createPartition(any());
+    }
+
+    @Test
+    void createPartitionInternalSkipsDefaultPartitionName() throws Exception {
+        MilvusServiceClient client = mockClientWithDefaultPartitions();
+        R<RpcStatus> successRpcStatusR = mock(R.class);
+        
when(successRpcStatusR.getStatus()).thenReturn(R.Status.Success.getCode());
+        when(successRpcStatusR.getMessage()).thenReturn("OK");
+        when(client.createPartition(any()))
+                .thenAnswer(
+                        invocation -> {
+                            CreatePartitionParam param = 
invocation.getArgument(0);
+                            String partitionName = extractPartitionName(param);
+                            if (partitionName == null
+                                    || partitionName.trim().isEmpty()
+                                    || "_default".equals(partitionName)) {
+                                throw new RuntimeException(
+                                        "invalid partitionName: " + 
partitionName);
+                            }
+                            return successRpcStatusR;
+                        });
+
+        MilvusCatalog catalog = createCatalogWithClient(client);
+        invokeCreatePartitionInternal(catalog, "_default, p1", 
TablePath.of("db", null, "coll"));
+
+        verify(client, times(1)).createPartition(any());
+    }
+
+    private MilvusCatalog createCatalogWithClient(MilvusServiceClient client) 
throws Exception {
+        MilvusCatalog catalog =
+                new MilvusCatalog("milvus", 
ReadonlyConfig.fromMap(Collections.emptyMap()));
+        Field clientField = MilvusCatalog.class.getDeclaredField("client");
+        clientField.setAccessible(true);
+        clientField.set(catalog, client);
+        return catalog;
+    }
+
+    private MilvusServiceClient mockClientWithDefaultPartitions() {
+        MilvusServiceClient client = mock(MilvusServiceClient.class);
+        @SuppressWarnings("unchecked")
+        R<ShowPartitionsResponse> showPartitionsR = mock(R.class);
+        
when(showPartitionsR.getStatus()).thenReturn(R.Status.Success.getCode());
+        when(showPartitionsR.getData())
+                .thenReturn(
+                        
ShowPartitionsResponse.newBuilder().addPartitionNames("_default").build());
+        when(showPartitionsR.getMessage()).thenReturn("OK");
+        when(client.showPartitions(any())).thenReturn(showPartitionsR);
+
+        @SuppressWarnings("unchecked")
+        R<RpcStatus> createPartitionR = mock(R.class);
+        
when(createPartitionR.getStatus()).thenReturn(R.Status.Success.getCode());
+        when(createPartitionR.getMessage()).thenReturn("OK");
+        when(client.createPartition(any())).thenReturn(createPartitionR);
+        return client;
+    }
+
+    private void invokeCreatePartitionInternal(
+            MilvusCatalog catalog, String partitionNames, TablePath tablePath) 
throws Exception {
+        Method method =
+                MilvusCatalog.class.getDeclaredMethod(
+                        "createPartitionInternal", String.class, 
TablePath.class);
+        method.setAccessible(true);
+        Assertions.assertDoesNotThrow(() -> method.invoke(catalog, 
partitionNames, tablePath));
+    }
+
+    private MilvusServiceClient getClient(MilvusCatalog catalog) throws 
Exception {
+        Field clientField = MilvusCatalog.class.getDeclaredField("client");
+        clientField.setAccessible(true);
+        return (MilvusServiceClient) clientField.get(catalog);
+    }
+
+    private String extractPartitionName(CreatePartitionParam param) {
+        try {
+            Method getter = param.getClass().getMethod("getPartitionName");
+            Object v = getter.invoke(param);
+            return v == null ? null : v.toString();
+        } catch (Exception ignored) {
+        }
+        try {
+            Field f = param.getClass().getDeclaredField("partitionName");
+            f.setAccessible(true);
+            Object v = f.get(param);
+            return v == null ? null : v.toString();
+        } catch (Exception ignored) {
+        }
+        return null;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-milvus/src/test/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/MilvusConvertUtilsTest.java
 
b/seatunnel-connectors-v2/connector-milvus/src/test/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/MilvusConvertUtilsTest.java
new file mode 100644
index 0000000000..ede5cd5349
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-milvus/src/test/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/MilvusConvertUtilsTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.milvus.utils;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.connectors.seatunnel.milvus.catalog.MilvusOptions;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import io.milvus.client.MilvusServiceClient;
+import io.milvus.grpc.CollectionSchema;
+import io.milvus.grpc.DataType;
+import io.milvus.grpc.DescribeCollectionResponse;
+import io.milvus.grpc.DescribeIndexResponse;
+import io.milvus.grpc.FieldSchema;
+import io.milvus.grpc.ShowPartitionsResponse;
+import io.milvus.param.R;
+
+import java.util.Collections;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class MilvusConvertUtilsTest {
+
+    @Test
+    void getCatalogTableDoesNotSetPartitionNamesWhenOnlyDefaultPartition() {
+        ReadonlyConfig config = ReadonlyConfig.fromMap(Collections.emptyMap());
+        MilvusConvertUtils utils = new MilvusConvertUtils(config);
+        MilvusServiceClient client = mock(MilvusServiceClient.class);
+
+        mockDescribeCollection(client);
+        mockDescribeIndex(client);
+        mockShowPartitions(
+                client, 
ShowPartitionsResponse.newBuilder().addPartitionNames("_default").build());
+
+        CatalogTable table = utils.getCatalogTable(client, "db", "coll");
+        
Assertions.assertFalse(table.getOptions().containsKey(MilvusOptions.PARTITION_NAMES));
+    }
+
+    @Test
+    void getCatalogTableSetsPartitionNamesExcludingDefaultPartition() {
+        ReadonlyConfig config = ReadonlyConfig.fromMap(Collections.emptyMap());
+        MilvusConvertUtils utils = new MilvusConvertUtils(config);
+        MilvusServiceClient client = mock(MilvusServiceClient.class);
+
+        mockDescribeCollection(client);
+        mockDescribeIndex(client);
+        mockShowPartitions(
+                client,
+                ShowPartitionsResponse.newBuilder()
+                        .addPartitionNames("_default")
+                        .addPartitionNames("p1")
+                        .addPartitionNames("p2")
+                        .build());
+
+        CatalogTable table = utils.getCatalogTable(client, "db", "coll");
+        Assertions.assertEquals("p1,p2", 
table.getOptions().get(MilvusOptions.PARTITION_NAMES));
+    }
+
+    private void mockDescribeCollection(MilvusServiceClient client) {
+        FieldSchema idField =
+                FieldSchema.newBuilder()
+                        .setName("id")
+                        .setDataType(DataType.Int64)
+                        .setIsPrimaryKey(true)
+                        .build();
+        CollectionSchema schema =
+                CollectionSchema.newBuilder()
+                        .addFields(idField)
+                        .setEnableDynamicField(false)
+                        .setDescription("desc")
+                        .build();
+        DescribeCollectionResponse describeCollectionResponse =
+                
DescribeCollectionResponse.newBuilder().setSchema(schema).setShardsNum(1).build();
+
+        @SuppressWarnings("unchecked")
+        R<DescribeCollectionResponse> response = mock(R.class);
+        when(response.getStatus()).thenReturn(R.Status.Success.getCode());
+        when(response.getData()).thenReturn(describeCollectionResponse);
+        when(client.describeCollection(any())).thenReturn(response);
+    }
+
+    private void mockDescribeIndex(MilvusServiceClient client) {
+        DescribeIndexResponse describeIndexResponse = 
DescribeIndexResponse.newBuilder().build();
+
+        @SuppressWarnings("unchecked")
+        R<DescribeIndexResponse> response = mock(R.class);
+        when(response.getStatus()).thenReturn(R.Status.Success.getCode());
+        when(response.getData()).thenReturn(describeIndexResponse);
+        when(client.describeIndex(any())).thenReturn(response);
+    }
+
+    private void mockShowPartitions(
+            MilvusServiceClient client, ShowPartitionsResponse 
showPartitionsResponse) {
+        @SuppressWarnings("unchecked")
+        R<ShowPartitionsResponse> response = mock(R.class);
+        when(response.getStatus()).thenReturn(R.Status.Success.getCode());
+        when(response.getData()).thenReturn(showPartitionsResponse);
+        when(client.showPartitions(any())).thenReturn(response);
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java
index 1ad1fc9882..fe78508d7b 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java
@@ -73,6 +73,8 @@ import io.milvus.param.dml.InsertParam;
 import io.milvus.param.dml.QueryParam;
 import io.milvus.param.index.CreateIndexParam;
 import io.milvus.param.index.DescribeIndexParam;
+import io.milvus.param.partition.CreatePartitionParam;
+import io.milvus.param.partition.ShowPartitionsParam;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
@@ -107,6 +109,9 @@ public class MilvusIT extends TestSuiteBase implements 
TestResource {
     private static final String COLLECTION_NAME_2 = "simple_example_2";
     private static final String COLLECTION_NAME_WITH_PARTITIONKEY =
             "simple_example_with_partitionkey";
+    private static final String COLLECTION_NAME_WITH_PARTITIONS = 
"simple_example_with_partitions";
+    private static final String COLLECTION_NAME_SOURCE_WITH_PARTITIONS =
+            "simple_example_source_with_partitions";
     private static final String ID_FIELD = "book_id";
     private static final String VECTOR_FIELD = "book_intro";
     private static final String VECTOR_FIELD2 = "book_kind";
@@ -253,6 +258,93 @@ public class MilvusIT extends TestSuiteBase implements 
TestResource {
 
         log.info("Collection created");
 
+        R<RpcStatus> retWithPartitions =
+                milvusClient.createCollection(
+                        CreateCollectionParam.newBuilder()
+                                
.withCollectionName(COLLECTION_NAME_SOURCE_WITH_PARTITIONS)
+                                .withFieldTypes(fieldsSchema)
+                                .build());
+        if (retWithPartitions.getStatus() != R.Status.Success.getCode()) {
+            throw new RuntimeException(
+                    "Failed to create collection! Error: " + 
retWithPartitions.getMessage());
+        }
+        retWithPartitions =
+                milvusClient.createIndex(
+                        CreateIndexParam.newBuilder()
+                                
.withCollectionName(COLLECTION_NAME_SOURCE_WITH_PARTITIONS)
+                                .withFieldName(VECTOR_FIELD)
+                                .withIndexType(IndexType.FLAT)
+                                .withMetricType(MetricType.L2)
+                                .build());
+        if (retWithPartitions.getStatus() != R.Status.Success.getCode()) {
+            throw new RuntimeException(
+                    "Failed to create index on vector field! Error: "
+                            + retWithPartitions.getMessage());
+        }
+        retWithPartitions =
+                milvusClient.createIndex(
+                        CreateIndexParam.newBuilder()
+                                
.withCollectionName(COLLECTION_NAME_SOURCE_WITH_PARTITIONS)
+                                .withFieldName(VECTOR_FIELD2)
+                                .withIndexType(IndexType.FLAT)
+                                .withMetricType(MetricType.L2)
+                                .build());
+        if (retWithPartitions.getStatus() != R.Status.Success.getCode()) {
+            throw new RuntimeException(
+                    "Failed to create index on vector field! Error: "
+                            + retWithPartitions.getMessage());
+        }
+        retWithPartitions =
+                milvusClient.createIndex(
+                        CreateIndexParam.newBuilder()
+                                
.withCollectionName(COLLECTION_NAME_SOURCE_WITH_PARTITIONS)
+                                .withFieldName(VECTOR_FIELD3)
+                                .withIndexType(IndexType.BIN_FLAT)
+                                .withMetricType(MetricType.HAMMING)
+                                .build());
+        if (retWithPartitions.getStatus() != R.Status.Success.getCode()) {
+            throw new RuntimeException(
+                    "Failed to create index on vector field! Error: "
+                            + retWithPartitions.getMessage());
+        }
+        retWithPartitions =
+                milvusClient.createIndex(
+                        CreateIndexParam.newBuilder()
+                                
.withCollectionName(COLLECTION_NAME_SOURCE_WITH_PARTITIONS)
+                                .withFieldName(VECTOR_FIELD4)
+                                .withIndexType(IndexType.SPARSE_INVERTED_INDEX)
+                                .withMetricType(MetricType.IP)
+                                .build());
+        if (retWithPartitions.getStatus() != R.Status.Success.getCode()) {
+            throw new RuntimeException(
+                    "Failed to create index on vector field! Error: "
+                            + retWithPartitions.getMessage());
+        }
+        milvusClient.loadCollection(
+                LoadCollectionParam.newBuilder()
+                        
.withCollectionName(COLLECTION_NAME_SOURCE_WITH_PARTITIONS)
+                        .build());
+        R<RpcStatus> partitionRet =
+                milvusClient.createPartition(
+                        CreatePartitionParam.newBuilder()
+                                
.withCollectionName(COLLECTION_NAME_SOURCE_WITH_PARTITIONS)
+                                .withPartitionName("p1")
+                                .build());
+        if (partitionRet.getStatus() != R.Status.Success.getCode()) {
+            throw new RuntimeException(
+                    "Failed to create partition! Error: " + 
partitionRet.getMessage());
+        }
+        partitionRet =
+                milvusClient.createPartition(
+                        CreatePartitionParam.newBuilder()
+                                
.withCollectionName(COLLECTION_NAME_SOURCE_WITH_PARTITIONS)
+                                .withPartitionName("p2")
+                                .build());
+        if (partitionRet.getStatus() != R.Status.Success.getCode()) {
+            throw new RuntimeException(
+                    "Failed to create partition! Error: " + 
partitionRet.getMessage());
+        }
+
         // Define fields With Partition Key
         List<FieldType> fieldsSchemaWithPartitionKey =
                 Arrays.asList(
@@ -395,9 +487,16 @@ public class MilvusIT extends TestSuiteBase implements 
TestResource {
                                 
.withCollectionName(COLLECTION_NAME_WITH_PARTITIONKEY)
                                 .withRows(rows)
                                 .build());
+        R<MutationResult> insertRet3 =
+                milvusClient.insert(
+                        InsertParam.newBuilder()
+                                
.withCollectionName(COLLECTION_NAME_SOURCE_WITH_PARTITIONS)
+                                .withRows(rows)
+                                .build());
 
         if (insertRet.getStatus() != R.Status.Success.getCode()
-                || insertRet2.getStatus() != R.Status.Success.getCode()) {
+                || insertRet2.getStatus() != R.Status.Success.getCode()
+                || insertRet3.getStatus() != R.Status.Success.getCode()) {
             throw new RuntimeException("Failed to insert! Error: " + 
insertRet.getMessage());
         }
     }
@@ -484,6 +583,33 @@ public class MilvusIT extends TestSuiteBase implements 
TestResource {
         Assertions.assertTrue(fields.contains(TITLE_FIELD));
     }
 
+    @TestTemplate
+    public void testMilvusWithPartitions(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                container.executeJob("/milvus-to-milvus-with-partitions.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        R<Boolean> hasCollectionResponse =
+                this.milvusClient.hasCollection(
+                        HasCollectionParam.newBuilder()
+                                .withDatabaseName("test")
+                                
.withCollectionName(COLLECTION_NAME_WITH_PARTITIONS)
+                                .build());
+        Assertions.assertTrue(hasCollectionResponse.getData());
+
+        R<io.milvus.grpc.ShowPartitionsResponse> showPartitionsResponse =
+                this.milvusClient.showPartitions(
+                        ShowPartitionsParam.newBuilder()
+                                .withDatabaseName("test")
+                                
.withCollectionName(COLLECTION_NAME_WITH_PARTITIONS)
+                                .build());
+        Assertions.assertEquals(R.Status.Success.getCode(), 
showPartitionsResponse.getStatus());
+        List<String> partitionNames = 
showPartitionsResponse.getData().getPartitionNamesList();
+        Assertions.assertTrue(partitionNames.contains("p1"));
+        Assertions.assertTrue(partitionNames.contains("p2"));
+    }
+
     @TestTemplate
     public void testFakeToMilvus(TestContainer container) throws IOException, 
InterruptedException {
         Container.ExecResult execResult = 
container.executeJob("/fake-to-milvus.conf");
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/resources/milvus-to-milvus-with-partitions.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/resources/milvus-to-milvus-with-partitions.conf
new file mode 100644
index 0000000000..95cb728104
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/resources/milvus-to-milvus-with-partitions.conf
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Milvus {
+    url = "http://milvus-e2e:19530";
+    token = "root:Milvus"
+    collection = "simple_example_source_with_partitions"
+  }
+}
+
+sink {
+  Milvus {
+    url = "http://milvus-e2e:19530";
+    token = "root:Milvus"
+    database = "test"
+    collection = "simple_example_with_partitions"
+  }
+}

Reply via email to