yuxiqian commented on code in PR #4246:
URL: https://github.com/apache/flink-cdc/pull/4246#discussion_r2958138050


##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java:
##########
@@ -222,6 +222,16 @@ else if (!isRemainingTablesCheckpointed
     }
 
     private void captureNewlyAddedTables() {
+        // Binlog-only mode: no action needed in Assigner, BinlogSplitReader 
handles it
+        if (sourceConfig.isScanBinlogNewlyAddedTableEnabled()) {
+            LOG.info(
+                    "Binlog-only newly added table capture is enabled. "
+                            + "New tables matching the pattern will be 
automatically captured "
+                            + "in binlog phase without snapshot.");
+            // No action needed here, BinlogSplitReader will handle the 
auto-capture
+            return;
+        }
+

Review Comment:
   Seems this check is unnecessary since 
`sourceConfig.isScanNewlyAddedTableEnabled` and 
`sourceConfig.isScanBinlogNewlyAddedTableEnabled` will not be enabled 
simultaneously?



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java:
##########
@@ -243,6 +243,22 @@ public class MySqlSourceOptions {
                     .withDescription(
                             "Whether capture the scan the newly added tables 
or not, by default is false. This option is only useful when we start the job 
from a savepoint/checkpoint.");
 
+    @Experimental
+    public static final ConfigOption<Boolean> 
SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED =
+            ConfigOptions.key("scan.binlog.newly-added-table.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "In binlog reading stage, whether to capture newly 
added tables "
+                                    + "that match the table patterns. When 
enabled, new tables will be "
+                                    + "captured without snapshot, only binlog 
events will be emitted. "
+                                    + "Cannot be enabled together with 
'scan.newly-added-table.enabled'. "
+                                    + "table-name pattern examples: "
+                                    + "'db\\.*' (all tables in database 'db'), 
"
+                                    + "'db\\.user_\\.*' (tables like 
'user_orders', 'user_profiles'), "
+                                    + "'db\\.order_[0-9]+' (tables like 
'order_1', 'order_2'), "
+                                    + "'db1\\.*,db2\\.user_\\.*' (all tables 
in 'db1' and 'user_*' tables in 'db2').");

Review Comment:
   The example seems incorrect, should be `db.\\.*`, `db.user_\\.*` ...



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java:
##########
@@ -446,4 +481,47 @@ public MySqlSourceConfig createConfig(int subtaskId, 
String serverName) {
                 useLegacyJsonFormat,
                 assignUnboundedChunkFirst);
     }
+
+    /**
+     * Convert Flink CDC style table pattern to Debezium style.
+     *
+     * <p>In CDC-style table matching, table names are separated by commas and 
use `\.` for regex
+     * matching. In Debezium style, table names are separated by pipes and use 
`.` for regex
+     * matching while `\.` is used as database.table separator.
+     *
+     * <p>Examples:
+     *
+     * <ul>
+     *   <li>{@code "db1.table_\.*,db2.user_\.*"} -> {@code 
"db1\.table_.*|db2\.user_.*"}
+     *   <li>{@code "test_db.orders"} -> {@code "test_db\.orders"}
+     * </ul>
+     *
+     * @param tables Flink CDC style table pattern
+     * @return Debezium style table pattern
+     */
+    private static String convertToDebeziumStyle(String tables) {

Review Comment:
   Avoid rewriting `TableIdRouter#convertTableListToRegExpPattern`?



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java:
##########
@@ -397,8 +413,26 @@ public MySqlSourceConfig createConfig(int subtaskId, 
String serverName) {
         if (databaseList != null) {
             props.setProperty("database.include.list", String.join(",", 
databaseList));
         }
+        // Validate: Two modes are mutually exclusive
+        if (scanBinlogNewlyAddedTableEnabled && scanNewlyAddedTableEnabled) {
+            throw new IllegalArgumentException(
+                    "Cannot enable both 
'scan.binlog.newly-added-table.enabled' and "
+                            + "'scan.newly-added-table.enabled' as they may 
cause duplicate data");
+        }
+
         if (tableList != null) {
-            props.setProperty("table.include.list", String.join(",", 
tableList));
+            // Convert table patterns to Debezium style if binlog auto-capture 
is enabled
+            if (scanBinlogNewlyAddedTableEnabled) {
+                String originalPattern = String.join(",", tableList);
+                String debeziumPattern = 
convertToDebeziumStyle(originalPattern);
+                props.setProperty("table.include.list", debeziumPattern);

Review Comment:
   IIUC when `scanBinlogNewlyAddedTableEnabled` uses standard RegEx (`.` is 
wildcard matcher and `\.` is the dot character). Keeping the same behavior in 
binlog capturing mode unchanged would make more sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to