Hi Egor,

Thanks for reporting this! There is some changes in MySQL binlog offset 
comparing logic in FLINK-37191[1]. The MySQL binlog filename comparison code 
(which isn’t null-safe) throwing an NPE has been there for a long time, but the 
execution flow might have changed and triggers this problem.

I’ve created FLINK-37835[2] to trace this issue. Would you mind sharing some 
debugging information like creating a breakpoint and log the BinlogOffsets that 
failed to compare?

[1] https://github.com/apache/flink-cdc/pull/3902
[2] https://issues.apache.org/jira/browse/FLINK-37835

Best Regards,
Xiqian

> 2025年5月21日 05:40,Egor Bredikhin <ebredik...@contobox.com.INVALID> 写道:
> 
> Hi!
> 
> I tried switching an application from Flink CDC version 3.3.0 to Flink CDC 
> version 3.4.0 and encountered the following exception:
> 
> (using flink-sql-connector-mysql-cdc and flink 1.20)
> 
> java.lang.RuntimeException: SplitFetcher thread 0 received unexpected 
> exception while polling the records
>     at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168)
>  ~[flink-connector-base-1.20.0.jar:1.20.0]
>     at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117)
>  [flink-connector-base-1.20.0.jar:1.20.0]
>     at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>  [?:?]
>     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) 
> [?:?]
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  [?:?]
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  [?:?]
>     at java.base/java.lang.Thread.run(Thread.java:829) [?:?]
> Caused by: java.lang.NullPointerException
>     at 
> java.base/java.lang.String$CaseInsensitiveComparator.compare(String.java:1225)
>  ~[?:?]
>     at 
> java.base/java.lang.String$CaseInsensitiveComparator.compare(String.java:1218)
>  ~[?:?]
>     at java.base/java.lang.String.compareToIgnoreCase(String.java:1258) ~[?:?]
>     at 
> org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset.compareTo(BinlogOffset.java:241)
>  ~[flink-sql-connector-mysql-cdc-3.4.0.jar:3.4.0]
>     at 
> org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset.isAfter(BinlogOffset.java:272)
>  ~[flink-sql-connector-mysql-cdc-3.4.0.jar:3.4.0]
>     at 
> org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.hasEnterPureBinlogPhase(BinlogSplitReader.java:295)
>  ~[flink-sql-connector-mysql-cdc-3.4.0.jar:3.4.0]
>     at 
> org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.shouldEmit(BinlogSplitReader.java:254)
>  ~[flink-sql-connector-mysql-cdc-3.4.0.jar:3.4.0]
>     at 
> org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.pollSplitRecords(BinlogSplitReader.java:176)
>  ~[flink-sql-connector-mysql-cdc-3.4.0.jar:3.4.0]
>     at 
> org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSplitReader.pollSplitRecords(MySqlSplitReader.java:122)
>  ~[flink-sql-connector-mysql-cdc-3.4.0.jar:3.4.0]
>     at 
> org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:84)
>  ~[flink-sql-connector-mysql-cdc-3.4.0.jar:3.4.0]
>     at 
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
>  ~[flink-connector-base-1.20.0.jar:1.20.0]
>     at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
>  ~[flink-connector-base-1.20.0.jar:1.20.0]
>     ... 6 more
> 
> My configuration is like the following:
> 
> String mysqlCdcConnector =
>     "  'connector' = 'mysql-cdc'," +
>     "  'hostname' = '%s'," +
>     "  'port' = '%s'," +
>     "  'database-name' = '%s'," +
>     "  'username' = '%s'," +
>     "  'password' = '%s'," +
>     "  'table-name' = '%s'," +
>     "  'scan.startup.mode' = '%s'," +
>     "  'connection.pool.size' = '8'," +
>     "  'server-id' = '%s'," +
>     "  'server-time-zone' = 'UTC'";
> 
> 
> tableEnv.executeSql(String.format(
>     "CREATE TABLE `mysql_session` (" +
>             "  `s_id` STRING," +
>             "  `user_id` INT," +
>             "  `date` DATE," +
>             ...
>             "  PRIMARY KEY (`s_id`, `date`) NOT ENFORCED" +
>             ") WITH (" + mysqlCdcConnector + ")",
>     mysqlHost, mysqlPort, mysqlUsersDb, mysqlUser, mysqlPass, 
> mysqlSessionsTable, "latest-offset", "5900-5950"
> ));
> 
> The issue does not happen with 3.3.0 and the same code works as expected.
> 
> Could you please take a look and check if there's an actual regression?
> 

Reply via email to