This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new c9e2751  [Improve] registerDriver improvement (#219)
c9e2751 is described below

commit c9e2751b4767cf8837ca67b58f3395a2a294ecd9
Author: benjobs <benj...@apache.org>
AuthorDate: Mon Oct 30 09:57:25 2023 +0800

    [Improve] registerDriver improvement (#219)
---
 .../apache/doris/flink/tools/cdc/DatabaseSync.java | 10 +++++++++
 .../flink/tools/cdc/mysql/MysqlDatabaseSync.java   | 26 +++++++++++++++++-----
 .../flink/tools/cdc/oracle/OracleDatabaseSync.java | 21 ++++++++++++++---
 .../tools/cdc/postgres/PostgresDatabaseSync.java   | 14 ++++++++++--
 .../tools/cdc/sqlserver/SqlServerDatabaseSync.java | 18 +++++++++++----
 5 files changed, 74 insertions(+), 15 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 8aef65d..fcd0f4c 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -50,26 +50,36 @@ public abstract class DatabaseSync {
     private static final Logger LOG = 
LoggerFactory.getLogger(DatabaseSync.class);
     private static final String LIGHT_SCHEMA_CHANGE = "light_schema_change";
     private static final String TABLE_NAME_OPTIONS = "table-name";
+
     protected Configuration config;
+
     protected String database;
+
     protected TableNameConverter converter;
     protected Pattern includingPattern;
     protected Pattern excludingPattern;
     protected Map<String, String> tableConfig;
     protected Configuration sinkConfig;
     protected boolean ignoreDefaultValue;
+
     public StreamExecutionEnvironment env;
     private boolean createTableOnly = false;
     private boolean newSchemaChange;
     protected String includingTables;
     protected String excludingTables;
 
+    public abstract void registerDriver() throws SQLException;
+
     public abstract Connection getConnection() throws SQLException;
 
     public abstract List<SourceSchema> getSchemaList() throws Exception;
 
     public abstract DataStreamSource<String> 
buildCdcSource(StreamExecutionEnvironment env);
 
+    public DatabaseSync() throws SQLException {
+        registerDriver();
+    }
+
     public void create(StreamExecutionEnvironment env, String database, 
Configuration config,
                        String tablePrefix, String tableSuffix, String 
includingTables,
                        String excludingTables, boolean ignoreDefaultValue, 
Configuration sinkConfig,
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
index 2235e0b..22e49aa 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
@@ -54,10 +54,25 @@ import java.util.regex.Pattern;
 
 public class MysqlDatabaseSync extends DatabaseSync {
     private static final Logger LOG = 
LoggerFactory.getLogger(MysqlDatabaseSync.class);
-    private static String JDBC_URL = 
"jdbc:mysql://%s:%d?useInformationSchema=true";
-    private static String PROPERTIES_PREFIX = "jdbc.properties.";
+    private static final String JDBC_URL = 
"jdbc:mysql://%s:%d?useInformationSchema=true";
+    private static final String PROPERTIES_PREFIX = "jdbc.properties.";
 
-    public MysqlDatabaseSync() {
+    public MysqlDatabaseSync() throws SQLException {
+        super();
+    }
+
+    @Override
+    public void registerDriver() throws SQLException {
+        try {
+            Class.forName("com.mysql.cj.jdbc.Driver");
+        } catch (ClassNotFoundException ex) {
+            LOG.warn("can not found class com.mysql.cj.jdbc.Driver, use class 
com.mysql.jdbc.Driver");
+            try {
+                Class.forName("com.mysql.jdbc.Driver");
+            } catch (Exception e) {
+                throw new SQLException("No suitable driver found, can not 
found class com.mysql.cj.jdbc.Driver and com.mysql.jdbc.Driver");
+            }
+        }
     }
 
     @Override
@@ -86,7 +101,7 @@ public class MysqlDatabaseSync extends DatabaseSync {
                     }
                     SourceSchema sourceSchema =
                             new MysqlSchema(metaData, databaseName, tableName, 
tableComment);
-                    sourceSchema.setModel(sourceSchema.primaryKeys.size() > 0 
? DataModel.UNIQUE : DataModel.DUPLICATE);
+                    sourceSchema.setModel(!sourceSchema.primaryKeys.isEmpty() 
? DataModel.UNIQUE : DataModel.DUPLICATE);
                     schemaList.add(sourceSchema);
                 }
             }
@@ -196,9 +211,8 @@ public class MysqlDatabaseSync extends DatabaseSync {
         }
         MySqlSource<String> mySqlSource = 
sourceBuilder.deserializer(schema).includeSchemaChanges(true).build();
 
-        DataStreamSource<String> streamSource = env.fromSource(
+        return env.fromSource(
                 mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
-        return streamSource;
     }
 
     /**
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
index 0049579..6e27eb6 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
@@ -59,9 +59,24 @@ import static 
com.ververica.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_
 public class OracleDatabaseSync extends DatabaseSync {
     private static final Logger LOG = 
LoggerFactory.getLogger(OracleDatabaseSync.class);
 
-    private static String JDBC_URL = "jdbc:oracle:thin:@%s:%d:%s";
+    private static final String JDBC_URL = "jdbc:oracle:thin:@%s:%d:%s";
 
-    public OracleDatabaseSync() {
+    public OracleDatabaseSync() throws SQLException {
+        super();
+    }
+
+    @Override
+    public void registerDriver() throws SQLException {
+        try {
+            Class.forName("oracle.jdbc.driver.OracleDriver");
+        } catch (ClassNotFoundException ex) {
+            LOG.warn("can not found class oracle.jdbc.driver.OracleDriver, use 
class oracle.jdbc.OracleDriver");
+            try {
+                Class.forName("oracle.jdbc.OracleDriver");
+            } catch (Exception e) {
+                throw new SQLException("No suitable driver found, can not 
found class oracle.jdbc.driver.OracleDriver and oracle.jdbc.OracleDriver");
+            }
+        }
     }
 
     @Override
@@ -97,7 +112,7 @@ public class OracleDatabaseSync extends DatabaseSync {
                     }
                     SourceSchema sourceSchema =
                             new OracleSchema(metaData, databaseName, 
schemaName, tableName, tableComment);
-                    sourceSchema.setModel(sourceSchema.primaryKeys.size() > 0 
? DataModel.UNIQUE : DataModel.DUPLICATE);
+                    sourceSchema.setModel(!sourceSchema.primaryKeys.isEmpty() 
? DataModel.UNIQUE : DataModel.DUPLICATE);
                     schemaList.add(sourceSchema);
                 }
             }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java
index 31878a5..b8c9ad1 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java
@@ -62,9 +62,19 @@ import static 
com.ververica.cdc.connectors.postgres.source.config.PostgresSource
 public class PostgresDatabaseSync extends DatabaseSync {
     private static final Logger LOG = 
LoggerFactory.getLogger(PostgresDatabaseSync.class);
 
-    private static String JDBC_URL = "jdbc:postgresql://%s:%d/%s";
+    private static final String JDBC_URL = "jdbc:postgresql://%s:%d/%s";
 
-    public PostgresDatabaseSync() {
+    public PostgresDatabaseSync() throws SQLException {
+        super();
+    }
+
+    @Override
+    public void registerDriver() throws SQLException {
+        try {
+            Class.forName("org.postgresql.Driver");
+        } catch (ClassNotFoundException ex) {
+            throw new SQLException("No suitable driver found, can not found 
class org.postgresql.Driver");
+        }
     }
 
     @Override
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
index 6cf9c9d..fb25212 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
@@ -58,10 +58,20 @@ import static 
com.ververica.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_
 
 public class SqlServerDatabaseSync extends DatabaseSync {
     private static final Logger LOG = 
LoggerFactory.getLogger(SqlServerDatabaseSync.class);
-    private static String JDBC_URL = "jdbc:sqlserver://%s:%d;database=%s";
-    private static String PORT = "port";
+    private static final String JDBC_URL = 
"jdbc:sqlserver://%s:%d;database=%s";
+    private static final String PORT = "port";
 
-    public SqlServerDatabaseSync() {
+    public SqlServerDatabaseSync() throws SQLException {
+        super();
+    }
+
+    @Override
+    public void registerDriver() throws SQLException {
+        try {
+            Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver");
+        } catch (ClassNotFoundException ex) {
+            throw new SQLException("No suitable driver found, can not found 
class com.microsoft.sqlserver.jdbc.SQLServerDriver");
+        }
     }
 
     @Override
@@ -91,7 +101,7 @@ public class SqlServerDatabaseSync extends DatabaseSync {
                     }
                     SourceSchema sourceSchema =
                             new SqlServerSchema(metaData, databaseName, null, 
tableName, tableComment);
-                    sourceSchema.setModel(sourceSchema.primaryKeys.size() > 0 
? DataModel.UNIQUE : DataModel.DUPLICATE);
+                    sourceSchema.setModel(!sourceSchema.primaryKeys.isEmpty() 
? DataModel.UNIQUE : DataModel.DUPLICATE);
                     schemaList.add(sourceSchema);
                 }
             }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to