This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4140cd1d8f [Fix][Connector-V2][OceanBase] oceanbase vector support 
simple vector index (#9072)
4140cd1d8f is described below

commit 4140cd1d8fc1f08e89b23e9e346868fbd0a08a75
Author: zhouyh <xxsc0...@163.com>
AuthorDate: Wed Apr 2 15:25:01 2025 +0800

    [Fix][Connector-V2][OceanBase] oceanbase vector support simple vector index 
(#9072)
    
    Co-authored-by: ranyu.zyh <ranyu....@digital-engine.com>
    Co-authored-by: Jia Fan <fanjiaemi...@qq.com>
---
 .../catalog/oceanbase/OceanBaseMySqlCatalog.java   |  5 ++
 .../OceanBaseMysqlCreateTableSqlBuilder.java       |  6 ++
 .../oceanbase/OceanBaseMySqlTypeConverter.java     |  7 +-
 .../connector-jdbc-e2e-part-2/pom.xml              |  7 ++
 .../seatunnel/jdbc/JdbcOceanBaseMilvusIT.java      | 92 +++++++++++++++++++++-
 5 files changed, 109 insertions(+), 8 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java
index 86410b573f..3fd7735907 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java
@@ -147,6 +147,11 @@ public class OceanBaseMySqlCatalog extends 
AbstractJdbcCatalog {
         String comment = resultSet.getString("COLUMN_COMMENT");
         Object defaultValue = resultSet.getObject("COLUMN_DEFAULT");
         String isNullableStr = resultSet.getString("IS_NULLABLE");
+
+        if (dataType.toUpperCase().startsWith("VECTOR")) {
+            dataType = "VECTOR";
+        }
+
         boolean isNullable = isNullableStr.equals("YES");
         // e.g. `decimal(10, 2)` is 10
         long numberPrecision = resultSet.getInt("NUMERIC_PRECISION");
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMysqlCreateTableSqlBuilder.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMysqlCreateTableSqlBuilder.java
index 9707ff23ac..1100b30d8f 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMysqlCreateTableSqlBuilder.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMysqlCreateTableSqlBuilder.java
@@ -265,6 +265,12 @@ public class OceanBaseMysqlCreateTableSqlBuilder {
                 keyName = "FOREIGN KEY";
                 // todo:
                 break;
+            case VECTOR_INDEX_KEY:
+                keyName = "VECTOR INDEX";
+                return String.format(
+                                "%s `%s` (%s)",
+                                keyName, constraintKey.getConstraintName(), 
indexColumns)
+                        + " WITH (distance=L2, type=hnsw)";
             default:
                 throw new UnsupportedOperationException(
                         "Unsupported constraint type: " + constraintType);
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeConverter.java
index f7790ff178..f135701879 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeConverter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeConverter.java
@@ -102,7 +102,6 @@ public class OceanBaseMySqlTypeConverter
     public static final long POWER_2_32 = (long) Math.pow(2, 32);
     public static final long MAX_VARBINARY_LENGTH = POWER_2_16 - 4;
 
-    private static final String VECTOR_TYPE_NAME = "";
     private static final String VECTOR_NAME = "VECTOR";
 
     public static final OceanBaseMySqlTypeConverter INSTANCE = new 
OceanBaseMySqlTypeConverter();
@@ -296,9 +295,9 @@ public class OceanBaseMySqlTypeConverter
                 builder.dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE);
                 builder.scale(typeDefine.getScale());
                 break;
-            case VECTOR_TYPE_NAME:
-                String columnType = typeDefine.getColumnType();
-                if (columnType.startsWith("vector(") && 
columnType.endsWith(")")) {
+            case VECTOR_NAME:
+                String columnType = typeDefine.getColumnType().toUpperCase();
+                if (columnType.startsWith("VECTOR(") && 
columnType.endsWith(")")) {
                     Integer number =
                             Integer.parseInt(
                                     columnType.substring(
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/pom.xml
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/pom.xml
index 7a0b89ce13..4363f49848 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/pom.xml
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/pom.xml
@@ -69,7 +69,14 @@
             <groupId>mysql</groupId>
             <artifactId>mysql-connector-java</artifactId>
             <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.google.protobuf</groupId>
+                    <artifactId>protobuf-java</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
+
         <dependency>
             <groupId>com.dameng</groupId>
             <artifactId>DmJdbcDriver18</artifactId>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMilvusIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMilvusIT.java
index e91eaed2de..58380bc6e7 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMilvusIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMilvusIT.java
@@ -66,6 +66,7 @@ import lombok.extern.slf4j.Slf4j;
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.Driver;
+import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.time.Duration;
@@ -90,7 +91,7 @@ import static org.awaitility.Awaitility.given;
         disabledReason = "Currently SPARK and FLINK not support adapt")
 public class JdbcOceanBaseMilvusIT extends TestSuiteBase implements 
TestResource {
 
-    private static final String IMAGE = "oceanbase/oceanbase-ce:vector";
+    private static final String IMAGE = 
"oceanbase/oceanbase-ce:4.3.5.1-101000042025031818";
 
     private static final String HOSTNAME = "e2e_oceanbase_vector";
     private static final int PORT = 2881;
@@ -145,7 +146,7 @@ public class JdbcOceanBaseMilvusIT extends TestSuiteBase 
implements TestResource
                 .await()
                 .atMost(360, TimeUnit.SECONDS)
                 .untilAsserted(() -> 
this.initializeJdbcConnection(jdbcCase.getJdbcUrl()));
-
+        setObVectorMemory();
         createSchemaIfNeeded();
         createNeededTables();
         this.container =
@@ -275,6 +276,20 @@ public class JdbcOceanBaseMilvusIT extends TestSuiteBase 
implements TestResource
         }
     }
 
+    @TestTemplate
+    public void testMilvusToOceanBaseNotTable(TestContainer container) throws 
Exception {
+        try {
+            dropOceanBaseTable();
+            checkTableNotExist();
+            Container.ExecResult execResult =
+                    
container.executeJob("/jdbc_milvus_source_and_oceanbase_sink.conf");
+            Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+            checkCreateTableSql();
+        } finally {
+            clearTable(jdbcCase.getDatabase(), jdbcCase.getSchema(), 
jdbcCase.getSinkTable());
+        }
+    }
+
     @TestTemplate
     public void testFakeToOceanBase(TestContainer container)
             throws IOException, InterruptedException {
@@ -407,6 +422,12 @@ public class JdbcOceanBaseMilvusIT extends TestSuiteBase 
implements TestResource
         connection.setAutoCommit(false);
     }
 
+    /** This parameter is required for OceanBase 4.3.x to enable vector 
indexing */
+    public void setObVectorMemory() {
+        String sql = "ALTER SYSTEM SET ob_vector_memory_limit_percentage = 30";
+        executeSql(sql);
+    }
+
     private Class<?> loadDriverClass() {
         try {
             return Class.forName(jdbcCase.getDriverClass());
@@ -418,13 +439,17 @@ public class JdbcOceanBaseMilvusIT extends TestSuiteBase 
implements TestResource
 
     private void createSchemaIfNeeded() {
         String sql = "CREATE DATABASE IF NOT EXISTS " + OCEANBASE_DATABASE;
+        executeSql(sql);
+    }
+
+    private void executeSql(String sql) {
         try {
             connection.prepareStatement(sql).executeUpdate();
         } catch (Exception e) {
             throw new SeaTunnelRuntimeException(
                     JdbcITErrorCode.CREATE_TABLE_FAILED, "Fail to execute sql 
" + sql, e);
         }
-        log.info("oceanbase schema created,sql is" + sql);
+        log.info("oceanbase execute sql,sql is:{}", sql);
     }
 
     String createSqlTemplate() {
@@ -466,7 +491,7 @@ public class JdbcOceanBaseMilvusIT extends TestSuiteBase 
implements TestResource
                                         jdbcCase.getSchema(),
                                         jdbcCase.getSinkTable()));
                 statement.execute(createSink);
-                log.info("oceanbase table created,sql is" + createSink);
+                log.info("oceanbase table created,sql is:{}", createSink);
             }
 
             connection.commit();
@@ -493,6 +518,65 @@ public class JdbcOceanBaseMilvusIT extends TestSuiteBase 
implements TestResource
         }
     }
 
+    private void dropOceanBaseTable() {
+        String sql =
+                String.format("drop table IF EXISTS %s.%s", 
OCEANBASE_DATABASE, OCEANBASE_SINK);
+        executeSql(sql);
+    }
+
+    private void checkTableNotExist() {
+        String sql =
+                String.format(
+                        "SELECT COUNT(*) FROM information_schema.tables WHERE 
table_schema = '%s' AND table_name = '%s'",
+                        OCEANBASE_DATABASE, OCEANBASE_SINK);
+
+        boolean isExist = false;
+        try (Statement statement = connection.createStatement();
+                ResultSet resultSet = statement.executeQuery(sql)) {
+
+            if (resultSet.next()) {
+                isExist = resultSet.getInt(1) > 0;
+            }
+        } catch (Exception e) {
+            throw new SeaTunnelRuntimeException(
+                    JdbcITErrorCode.CREATE_TABLE_FAILED, "Fail to execute sql: 
" + sql, e);
+        }
+        Assertions.assertFalse(isExist);
+    }
+
+    private void checkCreateTableSql() {
+        String sql = String.format("SHOW CREATE TABLE %s.%s;", 
OCEANBASE_DATABASE, OCEANBASE_SINK);
+        String createTableSql = "";
+        try (Statement statement = connection.createStatement();
+                ResultSet resultSet = statement.executeQuery(sql)) {
+
+            if (resultSet.next()) {
+                createTableSql = resultSet.getString(2);
+            }
+        } catch (Exception e) {
+            throw new SeaTunnelRuntimeException(
+                    JdbcITErrorCode.CREATE_TABLE_FAILED, "Fail to execute sql: 
" + sql, e);
+        }
+        // Removed the column store compression configuration that is 
automatically set by oceanbase
+        String startToken = "VECTOR KEY `vector_index` (`book_intro`) WITH 
(DISTANCE=L2, TYPE=HNSW";
+        int startIndex = createTableSql.indexOf(startToken);
+
+        if (startIndex != -1) {
+            String part1 = createTableSql.substring(0, startIndex + 
startToken.length());
+            createTableSql = part1 + "));";
+        }
+        Assertions.assertEquals(expectationSql(), createTableSql);
+    }
+
+    private String expectationSql() {
+        return "CREATE TABLE `simple_example` (\n"
+                + "  `book_id` bigint(20) NOT NULL,\n"
+                + "  `book_intro` VECTOR(4) NOT NULL,\n"
+                + "  `book_title` text NOT NULL,\n"
+                + "  PRIMARY KEY (`book_id`),\n"
+                + "  VECTOR KEY `vector_index` (`book_intro`) WITH 
(DISTANCE=L2, TYPE=HNSW));";
+    }
+
     private String[] getFieldNames() {
         return new String[] {
             "book_id", "book_intro", "book_title",

Reply via email to