[ https://issues.apache.org/jira/browse/FLINK-38218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sergei Morozov updated FLINK-38218: ----------------------------------- Summary: MySQL CDC source may ignore newly added tables while reading the binlog (scenario 1) (was: MySQL CDC source may ignore newly added tables while reading the binlog) > MySQL CDC source may ignore newly added tables while reading the binlog > (scenario 1) > ------------------------------------------------------------------------------------ > > Key: FLINK-38218 > URL: https://issues.apache.org/jira/browse/FLINK-38218 > Project: Flink > Issue Type: Bug > Components: Flink CDC > Affects Versions: cdc-3.2.1 > Reporter: Sergei Morozov > Priority: Major > Labels: pull-request-available > > > {*}Steps to reproduce{*}: > # Prepare tables {{{}A{}}}, {{B}} and {{C}} to be captured by the source. > The number of rows in {{B}} and {{C}} is irrelevant, table {{A}} needs to > have enough rows for its snapshot to take a couple of minutes (100 rows > should be enough given the configuration below). > # Make sure the source has the following configuration parameters: > ## {{{}chunk-meta.group.size: 2{}}}. This a small enough number to make the > binlog split metadata transmitted in groups. {{1}} may work too but I didn't > test with it. > ## {{{}scan.snapshot.fetch.size: 2{}}}. This will guarantee that the > snapshot of table A will take long enough (2 is the minimum allowed value). > # Include tables {{B}} and {{C}} into the source's configuration. > # Start the job and wait until it transitions to the binlog phase. > # Stop the job. > # Include table {{A}} into the configuration. > # Start the job and wait until the source reader starts reporting finished > offsets back to the enumerator. > # Stop the job before the snapshot is completed. > # Restart the job. > # Wait until the snapshot of table A is finished and the source transitions > to the binlog phase. > > {*}Expected behavior{*}: changes in table {{A}} are read from the binlog. > {*}Actual behavior{*}: changes in table A in the binlog are ignored. > h3. What's happening internally > > {*}Note{*}: for readability, I will use a simplified format for snapshot > split IDs (instead of {{{}A:0{}}}, I will use just {{{}A{}}}). > # The source reader snapshots tables {{B}} and {{C}} and reports their > finished snapshot split infos to the enumerator. > # Enumerator creates the binlog split with infos {{B}} and {{C}} and assigns > it to the reader. > # The job stops, gets table {{A}} included into the configuration and > restarts. > # The enumerator assigns split {{A}} and puts it into {{assignedSplits}} so > the order of its keys becomes {{{}B{}}}, {{{}C{}}}, {{{}A{}}}. > # The source reader snapshots {{A}} and reports its finished snapshot split > info(s) to the enumerator. > # If at this point the connection wasn't stopped and restarted, there > wouldn't be an issue. But the connection gets stopped and restarted. > # {{MySqlSnapshotSplitAssigner}} constructor sorts {{assignedSplits}} so the > order of its keys becomes {{{}A{}}}, {{{}B{}}}, {{{}C{}}}. > # The source reader knows that it has 2 split infos ({{{}B{}}} and > {{{}C{}}}) but the new total number is 3, so it requests split meta group 1. > # It receives the group with the following elements: {{{}C{}}}. > # It calculates {{existedSplitsOfLastGroup}} (which is an empty set). > # It doesn't deduplicate {{C}} and appends it to the finished split infos. > # As a result, finished split infos look like B, C, C. The transmission is > over, but the info for table A is lost. > # Subsequently, for all changes from table {{{}A{}}}, > {{BinlogSplitReader#shouldEmit()}} returns {{false}} because there's no > finished snapshot split info for this table. > > Note that other connectors that use {{SnapshotSplitAssigner}} from the > {{flink-cdc-base}} package may be prone to this issue as well. -- This message was sent by Atlassian Jira (v8.20.10#820010)