This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 4231798  [fix](cdc)fix the issue caused by Oracle table names 
containing slash (#355)
4231798 is described below

commit 4231798656f9dc3013d41ff9cc5c4c8fe88805c4
Author: Petrichor <31833513+vinle...@users.noreply.github.com>
AuthorDate: Mon Apr 8 14:11:10 2024 +0800

    [fix](cdc)fix the issue caused by Oracle table names containing slash (#355)
---
 .../doris/flink/tools/cdc/JdbcSourceSchema.java     | 21 +++++++++++++++++++--
 .../flink/tools/cdc/oracle/OracleDatabaseSync.java  | 11 -----------
 .../doris/flink/tools/cdc/oracle/OracleSchema.java  | 15 +++++++++++++++
 3 files changed, 34 insertions(+), 13 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java
index 86d6336..b421aff 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java
@@ -21,8 +21,10 @@ import org.apache.doris.flink.catalog.doris.FieldSchema;
 
 import java.sql.DatabaseMetaData;
 import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
+import java.util.List;
 
 /**
  * JdbcSourceSchema is a subclass of SourceSchema, used to build metadata 
about jdbc-related
@@ -38,7 +40,15 @@ public abstract class JdbcSourceSchema extends SourceSchema {
             String tableComment)
             throws Exception {
         super(databaseName, schemaName, tableName, tableComment);
-        fields = new LinkedHashMap<>();
+        fields = getColumnInfo(metaData, databaseName, schemaName, tableName);
+        primaryKeys = getPrimaryKeys(metaData, databaseName, schemaName, 
tableName);
+    }
+
+    public LinkedHashMap<String, FieldSchema> getColumnInfo(
+            DatabaseMetaData metaData, String databaseName, String schemaName, 
String tableName)
+            throws SQLException {
+        LinkedHashMap<String, FieldSchema> fields = new LinkedHashMap<>();
+        //
         try (ResultSet rs = metaData.getColumns(databaseName, schemaName, 
tableName, null)) {
             while (rs.next()) {
                 String fieldName = rs.getString("COLUMN_NAME");
@@ -57,14 +67,21 @@ public abstract class JdbcSourceSchema extends SourceSchema 
{
                 fields.put(fieldName, new FieldSchema(fieldName, dorisTypeStr, 
comment));
             }
         }
+        return fields;
+    }
 
-        primaryKeys = new ArrayList<>();
+    public List<String> getPrimaryKeys(
+            DatabaseMetaData metaData, String databaseName, String schemaName, 
String tableName)
+            throws SQLException {
+        List<String> primaryKeys = new ArrayList<>();
         try (ResultSet rs = metaData.getPrimaryKeys(databaseName, schemaName, 
tableName)) {
             while (rs.next()) {
                 String fieldName = rs.getString("COLUMN_NAME");
                 primaryKeys.add(fieldName);
             }
         }
+
+        return primaryKeys;
     }
 
     public abstract String convertToDorisType(String fieldType, Integer 
precision, Integer scale);
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
index f3a8c8b..945c839 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
@@ -32,8 +32,6 @@ import com.ververica.cdc.debezium.DebeziumSourceFunction;
 import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
 import com.ververica.cdc.debezium.table.DebeziumOptions;
 import org.apache.doris.flink.catalog.doris.DataModel;
-import org.apache.doris.flink.catalog.doris.TableSchema;
-import org.apache.doris.flink.exception.CreateTableException;
 import org.apache.doris.flink.tools.cdc.DatabaseSync;
 import org.apache.doris.flink.tools.cdc.SourceSchema;
 import org.slf4j.Logger;
@@ -120,15 +118,6 @@ public class OracleDatabaseSync extends DatabaseSync {
                     if (!isSyncNeeded(tableName)) {
                         continue;
                     }
-                    // Oracle allows table names to contain special characters 
such as /, #, $,
-                    // etc., as in 'A/B'.
-                    // However, Doris does not support tables with these 
characters.
-                    if (!tableName.matches(TableSchema.DORIS_TABLE_REGEX)) {
-                        throw new CreateTableException(
-                                String.format(
-                                        "The table name %s is invalid. Table 
names in Doris must match the regex pattern %s. Please consider renaming the 
table or use the 'excluding-tables' option to filter it out.",
-                                        tableName, 
TableSchema.DORIS_TABLE_REGEX));
-                    }
                     SourceSchema sourceSchema =
                             new OracleSchema(
                                     metaData, databaseName, schemaName, 
tableName, tableComment);
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java
index e059181..71e4477 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java
@@ -17,9 +17,12 @@
 
 package org.apache.doris.flink.tools.cdc.oracle;
 
+import org.apache.doris.flink.catalog.doris.FieldSchema;
 import org.apache.doris.flink.tools.cdc.JdbcSourceSchema;
 
 import java.sql.DatabaseMetaData;
+import java.sql.SQLException;
+import java.util.LinkedHashMap;
 
 public class OracleSchema extends JdbcSourceSchema {
 
@@ -42,4 +45,16 @@ public class OracleSchema extends JdbcSourceSchema {
     public String getCdcTableName() {
         return schemaName + "\\." + tableName;
     }
+
+    @Override
+    public LinkedHashMap<String, FieldSchema> getColumnInfo(
+            DatabaseMetaData metaData, String databaseName, String schemaName, 
String tableName)
+            throws SQLException {
+        // Oracle permits table names to include special characters such as /,
+        // etc., as in 'A/B'.
+        // When attempting to fetch column information for `A/B` via JDBC,
+        // it may throw an ORA-01424 error.
+        // Hence, we substitute `/` with '_' to address the issue.
+        return super.getColumnInfo(metaData, databaseName, schemaName, 
tableName.replace("/", "_"));
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to