lvyanquan commented on code in PR #4032:
URL: https://github.com/apache/flink-cdc/pull/4032#discussion_r2122518419


##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/TableDiscoveryUtils.java:
##########
@@ -50,54 +54,26 @@ public static List<TableId> listTables(
         // READ DATABASE NAMES
         // -------------------
         // Get the list of databases ...
-        LOG.info("Read list of available databases");
-        final List<String> databaseNames = new ArrayList<>();
-
-        jdbc.query(
-                "SHOW DATABASES",
-                rs -> {
-                    while (rs.next()) {
-                        String databaseName = rs.getString(1);
-                        if (databaseFilter.test(databaseName)) {
-                            databaseNames.add(databaseName);
-                        }
+        try (Connection connection = jdbc.connection()) {
+            DatabaseMetaData databaseMetaData = connection.getMetaData();
+            try (ResultSet tableResult = databaseMetaData.getTables(null, 
null, "%", TABLE_QUERY)) {
+                while (tableResult.next()) {
+                    String dbName = tableResult.getString("TABLE_CAT");
+                    if (!databaseFilter.test(dbName)) {

Review Comment:
   We should skip the whole tables of this database directly. 



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java:
##########
@@ -58,16 +64,23 @@ public static List<String> listDatabases(MySqlSourceConfig 
sourceConfig) {
     public static List<TableId> listTables(
             MySqlSourceConfig sourceConfig, @Nullable String dbName) {
         try (MySqlConnection jdbc = createMySqlConnection(sourceConfig)) {
-            List<String> databases =
-                    dbName != null ? Collections.singletonList(dbName) : 
listDatabases(jdbc);
-
             List<TableId> tableIds = new ArrayList<>();
-            for (String database : databases) {
-                tableIds.addAll(listTables(jdbc, database));
+            try (Connection connection = jdbc.connection()) {
+                DatabaseMetaData metaData = connection.getMetaData();
+                try (ResultSet resultSet = metaData.getTables(dbName, null, 
"%", TABLE_QUERY)) {
+                    while (resultSet.next()) {
+                        String database = resultSet.getString("TABLE_CAT");
+                        if (dbName == null && DB_LIST.contains(database)) {

Review Comment:
   DB_Set will have better performance than DB_LIST.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java:
##########
@@ -45,6 +48,9 @@
 /** Utilities for converting from debezium {@link Table} types to {@link 
Schema}. */
 public class MySqlSchemaUtils {
 
+    private static final String[] TABLE_QUERY = {"TABLE"};
+    private static final List<String> DB_LIST =

Review Comment:
   SYSTEM_DB_LIST.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to