chengcongchina opened a new pull request, #4334:
URL: https://github.com/apache/flink-cdc/pull/4334

   This closes [FLINK-39315](https://issues.apache.org/jira/browse/FLINK-39315).
   
   ### What is the purpose of the change
   
   This PR fixes a MySQL CDC source hang in the snapshot backfill phase when 
processing multiple snapshot splits sequentially while reusing the same 
`BinaryLogClient`.
   
   `SnapshotSplitReader.pollWithBuffer()` waits for the `BINLOG_END` watermark 
to finish a split. However, `BinaryLogClient` was reused across split 
executions and `MySqlStreamingChangeEventSource.execute()` registered multiple 
event/lifecycle listeners on each execution without unregistering them. As a 
result, listeners from previous splits could still receive binlog events during 
later splits and:
   
   - stop the shared `ChangeEventSourceContext` prematurely (causing the 
current split’s backfill to exit early), and/or
   - dispatch `BINLOG_END` via a stale `SignalEventDispatcher` into a stale 
queue (so the current `pollWithBuffer()` never sees `BINLOG_END`).
   
   This could leave the queue empty while the backfill thread has already 
stopped, causing the reader to hang indefinitely.
   
   ### Brief change log
   
   - Unregister `BinaryLogClient` event and lifecycle listeners in 
`MySqlStreamingChangeEventSource.execute()` after each execution to avoid 
listener accumulation and cross-split interference.
   - Ensure the unregister/cleanup logic is executed deterministically in the 
cleanup path (fail-fast behavior if cleanup fails).
   - Add/extend unit test coverage in `SnapshotSplitReaderTest` (based on 
`testMultipleSplitsWithBackfill`) to validate multiple snapshot splits with a 
forced backfill phase can finish and produce the expected output.
   
   ### Verifying this change
   
   This change is verified by unit tests:
   - `SnapshotSplitReaderTest#testMultipleSplitsWithBackfill`
   
   ### Does this pull request potentially affect one of the following parts:
   
   - Dependencies (does it add or upgrade a dependency): no
   - The public API, i.e., is any changed class annotated with 
@Public(Evolving): no
   - The serializers: no
   - The runtime per-record code paths (performance sensitive): no
   - Anything that affects deployment or recovery: no
   
   ### Documentation
   
   - Does this pull request introduce a new feature? no
   - If yes, how is the feature documented? not applicable


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to