[ 
https://issues.apache.org/jira/browse/FLINK-35600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Di Wu updated FLINK-35600:
--------------------------
    Description: 
Assume that the table has been split into 3 Chunks

Timeline
t1: chunk1 is read
t2: a piece of data A belonging to chunk2 is inserted in MySQL
t3: chunk2 is read, and data A has been sent downstream
t4: chunk3 is read

At this time, startOffset will be set to lowwatermark
t5: *BinlogSplitReader.pollSplitRecords* receives data A, and uses the method 
*shouldEmit* to determine whether the data is sent downstream

In this method
{code:java}
private boolean hasEnterPureBinlogPhase(TableId tableId, BinlogOffset position) 
{
    if (pureBinlogPhaseTables.contains(tableId)) {
        return true;
    }
    // the existed tables those have finished snapshot reading
    if (maxSplitHighWatermarkMap.containsKey(tableId)
            && position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))) {
        pureBinlogPhaseTables.add(tableId);
        return true;
    }
} {code}
*maxSplitHighWatermarkMap.get(tableId)* obtains the HighWatermark data without 
ts_sec variable, and the default value is 0
*position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))*
So this expression is judged as true

*Data A continues to be sent downstream, and the data is repeated*

  was:
Assume that the table has been split into 3 Chunks

Timeline
t1: chunk1 is read
t2: a piece of data A belonging to chunk2 is inserted in MySQL
t3: chunk2 is read, and data A has been sent downstream
t4: chunk3 is read

At this time, startOffset will be set to lowwatermark
t5: BinlogSplitReader.pollSplitRecords receives data A, and uses the method 
shouldEmit to determine whether the data is sent downstream

In this method
{code:java}
private boolean hasEnterPureBinlogPhase(TableId tableId, BinlogOffset position) 
{
    if (pureBinlogPhaseTables.contains(tableId)) {
        return true;
    }
    // the existed tables those have finished snapshot reading
    if (maxSplitHighWatermarkMap.containsKey(tableId)
            && position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))) {
        pureBinlogPhaseTables.add(tableId);
        return true;
    }
} {code}

*maxSplitHighWatermarkMap.get(tableId)* obtains the HighWatermark data without 
ts_sec variable, and the default value is 0
*position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))*
So this expression is judged as true

*Data A continues to be sent downstream, and the data is repeated*


> Data read duplication during the full-to-incremental conversion phase
> ---------------------------------------------------------------------
>
>                 Key: FLINK-35600
>                 URL: https://issues.apache.org/jira/browse/FLINK-35600
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: cdc-3.1.0
>            Reporter: Di Wu
>            Priority: Major
>              Labels: pull-request-available
>
> Assume that the table has been split into 3 Chunks
> Timeline
> t1: chunk1 is read
> t2: a piece of data A belonging to chunk2 is inserted in MySQL
> t3: chunk2 is read, and data A has been sent downstream
> t4: chunk3 is read
> At this time, startOffset will be set to lowwatermark
> t5: *BinlogSplitReader.pollSplitRecords* receives data A, and uses the method 
> *shouldEmit* to determine whether the data is sent downstream
> In this method
> {code:java}
> private boolean hasEnterPureBinlogPhase(TableId tableId, BinlogOffset 
> position) {
>     if (pureBinlogPhaseTables.contains(tableId)) {
>         return true;
>     }
>     // the existed tables those have finished snapshot reading
>     if (maxSplitHighWatermarkMap.containsKey(tableId)
>             && position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))) {
>         pureBinlogPhaseTables.add(tableId);
>         return true;
>     }
> } {code}
> *maxSplitHighWatermarkMap.get(tableId)* obtains the HighWatermark data 
> without ts_sec variable, and the default value is 0
> *position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))*
> So this expression is judged as true
> *Data A continues to be sent downstream, and the data is repeated*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to