yux created FLINK-37835:
---------------------------

             Summary: MySQL CDC BinlogOffset compareTo method throws NPE
                 Key: FLINK-37835
                 URL: https://issues.apache.org/jira/browse/FLINK-37835
             Project: Flink
          Issue Type: Bug
          Components: Flink CDC
    Affects Versions: cdc-3.4.0
            Reporter: yux


Originally reported in [dev@flink.apache.org|mailto:dev@flink.apache.org] by 
Egor Bredikhin.

 
{quote}Hi!

I tried switching an application from Flink CDC version 3.3.0 to Flink CDC 
version 3.4.0 and encountered the following exception:

(using flink-sql-connector-mysql-cdc and flink 1.20)

java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception 
while polling the records
    at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168)
 ~[flink-connector-base-1.20.0.jar:1.20.0]
    at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117)
 [flink-connector-base-1.20.0.jar:1.20.0]
    at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
 [?:?]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
    at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
 [?:?]
    at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
 [?:?]
    at java.base/java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: java.lang.NullPointerException
    at 
java.base/java.lang.String$CaseInsensitiveComparator.compare(String.java:1225) 
~[?:?]
    at 
java.base/java.lang.String$CaseInsensitiveComparator.compare(String.java:1218) 
~[?:?]
    at java.base/java.lang.String.compareToIgnoreCase(String.java:1258) ~[?:?]
    at 
org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset.compareTo(BinlogOffset.java:241)
 ~[flink-sql-connector-mysql-cdc-3.4.0.jar:3.4.0]
    at 
org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset.isAfter(BinlogOffset.java:272)
 ~[flink-sql-connector-mysql-cdc-3.4.0.jar:3.4.0]
    at 
org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.hasEnterPureBinlogPhase(BinlogSplitReader.java:295)
 ~[flink-sql-connector-mysql-cdc-3.4.0.jar:3.4.0]
    at 
org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.shouldEmit(BinlogSplitReader.java:254)
 ~[flink-sql-connector-mysql-cdc-3.4.0.jar:3.4.0]
    at 
org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.pollSplitRecords(BinlogSplitReader.java:176)
 ~[flink-sql-connector-mysql-cdc-3.4.0.jar:3.4.0]
    at 
org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSplitReader.pollSplitRecords(MySqlSplitReader.java:122)
 ~[flink-sql-connector-mysql-cdc-3.4.0.jar:3.4.0]
    at 
org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:84)
 ~[flink-sql-connector-mysql-cdc-3.4.0.jar:3.4.0]
    at 
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
 ~[flink-connector-base-1.20.0.jar:1.20.0]
    at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
 ~[flink-connector-base-1.20.0.jar:1.20.0]
    ... 6 more

My configuration is like the following:

String mysqlCdcConnector =
    "  'connector' = 'mysql-cdc'," +
    "  'hostname' = '%s'," +
    "  'port' = '%s'," +
    "  'database-name' = '%s'," +
    "  'username' = '%s'," +
    "  'password' = '%s'," +
    "  'table-name' = '%s'," +
    "  'scan.startup.mode' = '%s'," +
    "  'connection.pool.size' = '8'," +
    "  'server-id' = '%s'," +
    "  'server-time-zone' = 'UTC'";


tableEnv.executeSql(String.format(
    "CREATE TABLE `mysql_session` (" +
            "  `s_id` STRING," +
            "  `user_id` INT," +
            "  `date` DATE," +
            ...
            "  PRIMARY KEY (`s_id`, `date`) NOT ENFORCED" +
            ") WITH (" + mysqlCdcConnector + ")",
    mysqlHost, mysqlPort, mysqlUsersDb, mysqlUser, mysqlPass, 
mysqlSessionsTable, "latest-offset", "5900-5950"
));

The issue does not happen with 3.3.0 and the same code works as expected.

Could you please take a look and check if there's an actual regression?{quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to