[ 
https://issues.apache.org/jira/browse/FLINK-38270?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergei Morozov updated FLINK-38270:
-----------------------------------
    Description: 
The logic of removing finished snapshot split infos for no longer existing 
tables is inconsistent on the enumerator and the source reader. This may lead 
to a situation where the infos of an old table are retained on the reader but 
the infos of the new one aren’t transmitted. As a result, the binlog events for 
the newly table will be skipped.
h3. Discrepancy in the info subtraction logic

When a table is no longer captured by the connector, it’s removed from the 
enumerator’s and reader’s state.
h4. Enumerator
 # Discover tables that match the include list.
 # Compare them with the tables from the state.
 # Remove the state that corresponds to the no longer captured tables.

As a result:
 # If the table is no longer included into the configuration, it’s *removed* 
from the state.
 # If the table no longer exists, it is *also removed* from the state.

h4. Reader
 # Iterate finished snapshot split infos.
 # Remove all infos whose tables are no longer included into the configuration.

As a result:
 # If the table is no longer included into the configuration, it’s *removed* 
from the state.
 # If the table no longer exists, it _is *not*_ *removed* from the state, 
because the reader doesn’t know that the table no longer exists.

h3. Impact of the discrepancy on the binlog split metadata transmission

The transmission logic uses the number split infos on each side as the 
indicator of completion and relies on the fact that the no longer relevant 
infos are subtracted consistently. The fact that an info that’s subtracted from 
enumerator’s state may not be subtracted from the reader’s may lead to the fact 
that the info of a newly snapshotted table won’t be transmitted to the reader.
h3. Steps to reproduce
 # Create a source connection that captures tables {{A}} and {{{}B{}}}.
 # Start the connection and wait until it reaches the steady state.
 # Stop the connection.
 # Drop table {{A}} in the source database.
 # Start the connection.
 # Observe the state of the enumerator and the reader
 ## The enumerator’s finished split infos will only contain  B (A no longer 
exists, so it’s subtracted from the state).
 ## The reader’s finished split infos will contain A and B (both still match 
the include list).
 # Stop the connection.
 # Add table C to the source configuration.
 # Start the connection.
 # Observe the state of the enumerator and the reader
 ## The enumerator’s finished split infos will contain B and C (the total 
number is 2).
 ## The reader’s finished split infos will contain A and B (both still match 
the include list, and the total number is also 2).
 # Make data changes in C and confirm that they are not captured.

h3. Observing state
h4. Enumerator
 # Set a breakpoint on {{MySqlSourceEnumerator#snapshotState()}}
 # Evaluate 
{{((MySqlHybridSplitAssigner)splitAssigner).snapshotSplitAssigner.splitFinishedOffsets}}

h4. Reader
 # Set a breakpoint on {{MySqlSplitReader#fetch()}}
 # Evaluate 
{{((MySqlBinlogSplit)((BinlogSplitReader)this.currentReader).currentBinlogSplit).finishedSnapshotSplitInfos}}

  was:
The logic of removing finished snapshot split infos for no longer existing 
tables is inconsistent on the enumerator and the source reader. This may lead 
to a situation where the infos of an old table are retained on the reader but 
the infos of the new one aren’t transmitted. As a result, the binlog events for 
the newly table will be skipped.
h3. Discrepancy in the info subtraction logic

When a table is no longer captured by the connector, it’s removed from the 
enumerator’s and reader’s state.
h4. Enumerator
 # Discover tables that match the include list.
 # Compare them with the tables from the state.
 # Remove the state that corresponds to the no longer captured tables.

As a result:
 # If the table is no longer included into the configuration, it’s *removed* 
from the state.
 # If the table no longer exists, it is *also removed* from the state.

h4. Reader
 # Iterate finished snapshot split infos.
 # Remove all infos whose tables are no longer included into the configuration.

As a result:
 # If the table is no longer included into the configuration, it’s *removed* 
from the state.
 # If the table no longer exists, it _is *not*_ *removed* from the state, 
because the reader doesn’t know that the table no longer exists.

h3. Impact of the discrepancy on the binlog split metadata transmission

The transmission logic uses the number split infos on each side as the 
indicator of completion and relies on the fact that the no longer relevant 
infos are subtracted consistently. The fact that an info that’s subtracted from 
enumerator’s state may not be subtracted from the reader’s may lead to the fact 
that the info of a newly snapshotted table won’t be transmitted to the reader.
h3. Steps to reproduce
 # Create a source connection that captures tables {{A}} and {{{}B{}}}.
 # Start the connection and wait until it reaches the steady state.
 # Stop the connection.
 # Drop table {{A}} in the source database.
 # Start the connection.
 # Observe the state of the enumerator and the reader
 ## The enumerator’s finished split infos will only contain  B (A no longer 
exists, so it’s subtracted from the state).
 ## The reader’s finished split infos will contain A and B (both still match 
the include list).
 # Stop the connection.
 # Add table C to the source configuration.
 # Start the connection.
 # Observe the state of the enumerator and the reader
 ## The enumerator’s finished split infos will contain B and C (the total 
number is 2).
 ## The reader’s finished split infos will contain A and B (both still match 
the include list, and the total number is also 2).
 # Make data changes in C and confirm that they are not captured.
 # Confirm that the logs contain the following message:
> Following expected tables are not being read from binlog [<database-name>.c]

h3. Observing state
h4. Enumerator
 # Set a breakpoint on {{MySqlSourceEnumerator#snapshotState()}}
 # Evaluate 
{{((MySqlHybridSplitAssigner)splitAssigner).snapshotSplitAssigner.splitFinishedOffsets}}

h4. Reader
 # Set a breakpoint on {{MySqlSplitReader#fetch()}}
 # Evaluate 
{{((MySqlBinlogSplit)((BinlogSplitReader)this.currentReader).currentBinlogSplit).finishedSnapshotSplitInfos}}


> MySQL CDC source may ignore newly added tables while reading the binlog 
> (scenario 2)
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-38270
>                 URL: https://issues.apache.org/jira/browse/FLINK-38270
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: cdc-3.2.0
>            Reporter: Sergei Morozov
>            Priority: Major
>
> The logic of removing finished snapshot split infos for no longer existing 
> tables is inconsistent on the enumerator and the source reader. This may lead 
> to a situation where the infos of an old table are retained on the reader but 
> the infos of the new one aren’t transmitted. As a result, the binlog events 
> for the newly table will be skipped.
> h3. Discrepancy in the info subtraction logic
> When a table is no longer captured by the connector, it’s removed from the 
> enumerator’s and reader’s state.
> h4. Enumerator
>  # Discover tables that match the include list.
>  # Compare them with the tables from the state.
>  # Remove the state that corresponds to the no longer captured tables.
> As a result:
>  # If the table is no longer included into the configuration, it’s *removed* 
> from the state.
>  # If the table no longer exists, it is *also removed* from the state.
> h4. Reader
>  # Iterate finished snapshot split infos.
>  # Remove all infos whose tables are no longer included into the 
> configuration.
> As a result:
>  # If the table is no longer included into the configuration, it’s *removed* 
> from the state.
>  # If the table no longer exists, it _is *not*_ *removed* from the state, 
> because the reader doesn’t know that the table no longer exists.
> h3. Impact of the discrepancy on the binlog split metadata transmission
> The transmission logic uses the number split infos on each side as the 
> indicator of completion and relies on the fact that the no longer relevant 
> infos are subtracted consistently. The fact that an info that’s subtracted 
> from enumerator’s state may not be subtracted from the reader’s may lead to 
> the fact that the info of a newly snapshotted table won’t be transmitted to 
> the reader.
> h3. Steps to reproduce
>  # Create a source connection that captures tables {{A}} and {{{}B{}}}.
>  # Start the connection and wait until it reaches the steady state.
>  # Stop the connection.
>  # Drop table {{A}} in the source database.
>  # Start the connection.
>  # Observe the state of the enumerator and the reader
>  ## The enumerator’s finished split infos will only contain  B (A no longer 
> exists, so it’s subtracted from the state).
>  ## The reader’s finished split infos will contain A and B (both still match 
> the include list).
>  # Stop the connection.
>  # Add table C to the source configuration.
>  # Start the connection.
>  # Observe the state of the enumerator and the reader
>  ## The enumerator’s finished split infos will contain B and C (the total 
> number is 2).
>  ## The reader’s finished split infos will contain A and B (both still match 
> the include list, and the total number is also 2).
>  # Make data changes in C and confirm that they are not captured.
> h3. Observing state
> h4. Enumerator
>  # Set a breakpoint on {{MySqlSourceEnumerator#snapshotState()}}
>  # Evaluate 
> {{((MySqlHybridSplitAssigner)splitAssigner).snapshotSplitAssigner.splitFinishedOffsets}}
> h4. Reader
>  # Set a breakpoint on {{MySqlSplitReader#fetch()}}
>  # Evaluate 
> {{((MySqlBinlogSplit)((BinlogSplitReader)this.currentReader).currentBinlogSplit).finishedSnapshotSplitInfos}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to