This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 4231798 [fix](cdc)fix the issue caused by Oracle table names containing slash (#355) 4231798 is described below commit 4231798656f9dc3013d41ff9cc5c4c8fe88805c4 Author: Petrichor <31833513+vinle...@users.noreply.github.com> AuthorDate: Mon Apr 8 14:11:10 2024 +0800 [fix](cdc)fix the issue caused by Oracle table names containing slash (#355) --- .../doris/flink/tools/cdc/JdbcSourceSchema.java | 21 +++++++++++++++++++-- .../flink/tools/cdc/oracle/OracleDatabaseSync.java | 11 ----------- .../doris/flink/tools/cdc/oracle/OracleSchema.java | 15 +++++++++++++++ 3 files changed, 34 insertions(+), 13 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java index 86d6336..b421aff 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java @@ -21,8 +21,10 @@ import org.apache.doris.flink.catalog.doris.FieldSchema; import java.sql.DatabaseMetaData; import java.sql.ResultSet; +import java.sql.SQLException; import java.util.ArrayList; import java.util.LinkedHashMap; +import java.util.List; /** * JdbcSourceSchema is a subclass of SourceSchema, used to build metadata about jdbc-related @@ -38,7 +40,15 @@ public abstract class JdbcSourceSchema extends SourceSchema { String tableComment) throws Exception { super(databaseName, schemaName, tableName, tableComment); - fields = new LinkedHashMap<>(); + fields = getColumnInfo(metaData, databaseName, schemaName, tableName); + primaryKeys = getPrimaryKeys(metaData, databaseName, schemaName, tableName); + } + + public LinkedHashMap<String, FieldSchema> getColumnInfo( + DatabaseMetaData metaData, String databaseName, String schemaName, String tableName) + throws SQLException { + LinkedHashMap<String, FieldSchema> fields = new LinkedHashMap<>(); + // try (ResultSet rs = metaData.getColumns(databaseName, schemaName, tableName, null)) { while (rs.next()) { String fieldName = rs.getString("COLUMN_NAME"); @@ -57,14 +67,21 @@ public abstract class JdbcSourceSchema extends SourceSchema { fields.put(fieldName, new FieldSchema(fieldName, dorisTypeStr, comment)); } } + return fields; + } - primaryKeys = new ArrayList<>(); + public List<String> getPrimaryKeys( + DatabaseMetaData metaData, String databaseName, String schemaName, String tableName) + throws SQLException { + List<String> primaryKeys = new ArrayList<>(); try (ResultSet rs = metaData.getPrimaryKeys(databaseName, schemaName, tableName)) { while (rs.next()) { String fieldName = rs.getString("COLUMN_NAME"); primaryKeys.add(fieldName); } } + + return primaryKeys; } public abstract String convertToDorisType(String fieldType, Integer precision, Integer scale); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java index f3a8c8b..945c839 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java @@ -32,8 +32,6 @@ import com.ververica.cdc.debezium.DebeziumSourceFunction; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import com.ververica.cdc.debezium.table.DebeziumOptions; import org.apache.doris.flink.catalog.doris.DataModel; -import org.apache.doris.flink.catalog.doris.TableSchema; -import org.apache.doris.flink.exception.CreateTableException; import org.apache.doris.flink.tools.cdc.DatabaseSync; import org.apache.doris.flink.tools.cdc.SourceSchema; import org.slf4j.Logger; @@ -120,15 +118,6 @@ public class OracleDatabaseSync extends DatabaseSync { if (!isSyncNeeded(tableName)) { continue; } - // Oracle allows table names to contain special characters such as /, #, $, - // etc., as in 'A/B'. - // However, Doris does not support tables with these characters. - if (!tableName.matches(TableSchema.DORIS_TABLE_REGEX)) { - throw new CreateTableException( - String.format( - "The table name %s is invalid. Table names in Doris must match the regex pattern %s. Please consider renaming the table or use the 'excluding-tables' option to filter it out.", - tableName, TableSchema.DORIS_TABLE_REGEX)); - } SourceSchema sourceSchema = new OracleSchema( metaData, databaseName, schemaName, tableName, tableComment); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java index e059181..71e4477 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java @@ -17,9 +17,12 @@ package org.apache.doris.flink.tools.cdc.oracle; +import org.apache.doris.flink.catalog.doris.FieldSchema; import org.apache.doris.flink.tools.cdc.JdbcSourceSchema; import java.sql.DatabaseMetaData; +import java.sql.SQLException; +import java.util.LinkedHashMap; public class OracleSchema extends JdbcSourceSchema { @@ -42,4 +45,16 @@ public class OracleSchema extends JdbcSourceSchema { public String getCdcTableName() { return schemaName + "\\." + tableName; } + + @Override + public LinkedHashMap<String, FieldSchema> getColumnInfo( + DatabaseMetaData metaData, String databaseName, String schemaName, String tableName) + throws SQLException { + // Oracle permits table names to include special characters such as /, + // etc., as in 'A/B'. + // When attempting to fetch column information for `A/B` via JDBC, + // it may throw an ORA-01424 error. + // Hence, we substitute `/` with '_' to address the issue. + return super.getColumnInfo(metaData, databaseName, schemaName, tableName.replace("/", "_")); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org