This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new c9e2751 [Improve] registerDriver improvement (#219) c9e2751 is described below commit c9e2751b4767cf8837ca67b58f3395a2a294ecd9 Author: benjobs <benj...@apache.org> AuthorDate: Mon Oct 30 09:57:25 2023 +0800 [Improve] registerDriver improvement (#219) --- .../apache/doris/flink/tools/cdc/DatabaseSync.java | 10 +++++++++ .../flink/tools/cdc/mysql/MysqlDatabaseSync.java | 26 +++++++++++++++++----- .../flink/tools/cdc/oracle/OracleDatabaseSync.java | 21 ++++++++++++++--- .../tools/cdc/postgres/PostgresDatabaseSync.java | 14 ++++++++++-- .../tools/cdc/sqlserver/SqlServerDatabaseSync.java | 18 +++++++++++---- 5 files changed, 74 insertions(+), 15 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index 8aef65d..fcd0f4c 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -50,26 +50,36 @@ public abstract class DatabaseSync { private static final Logger LOG = LoggerFactory.getLogger(DatabaseSync.class); private static final String LIGHT_SCHEMA_CHANGE = "light_schema_change"; private static final String TABLE_NAME_OPTIONS = "table-name"; + protected Configuration config; + protected String database; + protected TableNameConverter converter; protected Pattern includingPattern; protected Pattern excludingPattern; protected Map<String, String> tableConfig; protected Configuration sinkConfig; protected boolean ignoreDefaultValue; + public StreamExecutionEnvironment env; private boolean createTableOnly = false; private boolean newSchemaChange; protected String includingTables; protected String excludingTables; + public abstract void registerDriver() throws SQLException; + public abstract Connection getConnection() throws SQLException; public abstract List<SourceSchema> getSchemaList() throws Exception; public abstract DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env); + public DatabaseSync() throws SQLException { + registerDriver(); + } + public void create(StreamExecutionEnvironment env, String database, Configuration config, String tablePrefix, String tableSuffix, String includingTables, String excludingTables, boolean ignoreDefaultValue, Configuration sinkConfig, diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java index 2235e0b..22e49aa 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java @@ -54,10 +54,25 @@ import java.util.regex.Pattern; public class MysqlDatabaseSync extends DatabaseSync { private static final Logger LOG = LoggerFactory.getLogger(MysqlDatabaseSync.class); - private static String JDBC_URL = "jdbc:mysql://%s:%d?useInformationSchema=true"; - private static String PROPERTIES_PREFIX = "jdbc.properties."; + private static final String JDBC_URL = "jdbc:mysql://%s:%d?useInformationSchema=true"; + private static final String PROPERTIES_PREFIX = "jdbc.properties."; - public MysqlDatabaseSync() { + public MysqlDatabaseSync() throws SQLException { + super(); + } + + @Override + public void registerDriver() throws SQLException { + try { + Class.forName("com.mysql.cj.jdbc.Driver"); + } catch (ClassNotFoundException ex) { + LOG.warn("can not found class com.mysql.cj.jdbc.Driver, use class com.mysql.jdbc.Driver"); + try { + Class.forName("com.mysql.jdbc.Driver"); + } catch (Exception e) { + throw new SQLException("No suitable driver found, can not found class com.mysql.cj.jdbc.Driver and com.mysql.jdbc.Driver"); + } + } } @Override @@ -86,7 +101,7 @@ public class MysqlDatabaseSync extends DatabaseSync { } SourceSchema sourceSchema = new MysqlSchema(metaData, databaseName, tableName, tableComment); - sourceSchema.setModel(sourceSchema.primaryKeys.size() > 0 ? DataModel.UNIQUE : DataModel.DUPLICATE); + sourceSchema.setModel(!sourceSchema.primaryKeys.isEmpty() ? DataModel.UNIQUE : DataModel.DUPLICATE); schemaList.add(sourceSchema); } } @@ -196,9 +211,8 @@ public class MysqlDatabaseSync extends DatabaseSync { } MySqlSource<String> mySqlSource = sourceBuilder.deserializer(schema).includeSchemaChanges(true).build(); - DataStreamSource<String> streamSource = env.fromSource( + return env.fromSource( mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source"); - return streamSource; } /** diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java index 0049579..6e27eb6 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java @@ -59,9 +59,24 @@ import static com.ververica.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_ public class OracleDatabaseSync extends DatabaseSync { private static final Logger LOG = LoggerFactory.getLogger(OracleDatabaseSync.class); - private static String JDBC_URL = "jdbc:oracle:thin:@%s:%d:%s"; + private static final String JDBC_URL = "jdbc:oracle:thin:@%s:%d:%s"; - public OracleDatabaseSync() { + public OracleDatabaseSync() throws SQLException { + super(); + } + + @Override + public void registerDriver() throws SQLException { + try { + Class.forName("oracle.jdbc.driver.OracleDriver"); + } catch (ClassNotFoundException ex) { + LOG.warn("can not found class oracle.jdbc.driver.OracleDriver, use class oracle.jdbc.OracleDriver"); + try { + Class.forName("oracle.jdbc.OracleDriver"); + } catch (Exception e) { + throw new SQLException("No suitable driver found, can not found class oracle.jdbc.driver.OracleDriver and oracle.jdbc.OracleDriver"); + } + } } @Override @@ -97,7 +112,7 @@ public class OracleDatabaseSync extends DatabaseSync { } SourceSchema sourceSchema = new OracleSchema(metaData, databaseName, schemaName, tableName, tableComment); - sourceSchema.setModel(sourceSchema.primaryKeys.size() > 0 ? DataModel.UNIQUE : DataModel.DUPLICATE); + sourceSchema.setModel(!sourceSchema.primaryKeys.isEmpty() ? DataModel.UNIQUE : DataModel.DUPLICATE); schemaList.add(sourceSchema); } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java index 31878a5..b8c9ad1 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java @@ -62,9 +62,19 @@ import static com.ververica.cdc.connectors.postgres.source.config.PostgresSource public class PostgresDatabaseSync extends DatabaseSync { private static final Logger LOG = LoggerFactory.getLogger(PostgresDatabaseSync.class); - private static String JDBC_URL = "jdbc:postgresql://%s:%d/%s"; + private static final String JDBC_URL = "jdbc:postgresql://%s:%d/%s"; - public PostgresDatabaseSync() { + public PostgresDatabaseSync() throws SQLException { + super(); + } + + @Override + public void registerDriver() throws SQLException { + try { + Class.forName("org.postgresql.Driver"); + } catch (ClassNotFoundException ex) { + throw new SQLException("No suitable driver found, can not found class org.postgresql.Driver"); + } } @Override diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java index 6cf9c9d..fb25212 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java @@ -58,10 +58,20 @@ import static com.ververica.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_ public class SqlServerDatabaseSync extends DatabaseSync { private static final Logger LOG = LoggerFactory.getLogger(SqlServerDatabaseSync.class); - private static String JDBC_URL = "jdbc:sqlserver://%s:%d;database=%s"; - private static String PORT = "port"; + private static final String JDBC_URL = "jdbc:sqlserver://%s:%d;database=%s"; + private static final String PORT = "port"; - public SqlServerDatabaseSync() { + public SqlServerDatabaseSync() throws SQLException { + super(); + } + + @Override + public void registerDriver() throws SQLException { + try { + Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver"); + } catch (ClassNotFoundException ex) { + throw new SQLException("No suitable driver found, can not found class com.microsoft.sqlserver.jdbc.SQLServerDriver"); + } } @Override @@ -91,7 +101,7 @@ public class SqlServerDatabaseSync extends DatabaseSync { } SourceSchema sourceSchema = new SqlServerSchema(metaData, databaseName, null, tableName, tableComment); - sourceSchema.setModel(sourceSchema.primaryKeys.size() > 0 ? DataModel.UNIQUE : DataModel.DUPLICATE); + sourceSchema.setModel(!sourceSchema.primaryKeys.isEmpty() ? DataModel.UNIQUE : DataModel.DUPLICATE); schemaList.add(sourceSchema); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org