Hao Yu created FLINK-39198:
------------------------------

             Summary: Fix the issue of potential data loss during recovery from 
a checkpoint.
                 Key: FLINK-39198
                 URL: https://issues.apache.org/jira/browse/FLINK-39198
             Project: Flink
          Issue Type: Improvement
          Components: Flink CDC
    Affects Versions: cdc-3.5.0, cdc-3.4.0, cdc-3.3.0, cdc-3.1.1, cdc-3.2.0
         Environment: Flink cdc version : all version 

Flink version: 1.17.1

 
            Reporter: Hao Yu


Flink CDC creates an eventFilter inside the MySqlBinlogSplitReadTask.  
This filter is produced through the following method. In timestamp startOptions 
mode, the eventFilter will exclude events whose timestamps are earlier than the 
user-defined timestamp. However, if the job resumes from a savepoint or 
checkpoint and the system automatically sets the timestamp parameter, this 
approach can result in data loss.


```java

private Predicate<Event> createEventFilter() {
    // If the startup mode is set as TIMESTAMP, we need to apply a filter on 
event to drop
    // events earlier than the specified timestamp.

    // NOTE: Here we take user's configuration 
(statefulTaskContext.getSourceConfig())
    // as the ground truth. This might be fragile if user changes the config 
and recover
    // the job from savepoint / checkpoint, as there might be conflict between 
user's config
    // and the state in savepoint / checkpoint. But as we don't promise 
compatibility of
    // checkpoint after changing the config, this is acceptable for now.
    StartupOptions startupOptions = 
statefulTaskContext.getSourceConfig().getStartupOptions();
    if (startupOptions.startupMode.equals(StartupMode.TIMESTAMP)) {
        if (startupOptions.binlogOffset == null) {
            throw new NullPointerException(
                    "The startup option was set to TIMESTAMP "
                            + "but unable to find starting binlog offset. 
Please check if the timestamp is specified in "
                            + "configuration. ");
        }
        long startTimestampSec = startupOptions.binlogOffset.getTimestampSec();
        // We only skip data change event, as other kinds of events are 
necessary for updating
        // some internal state inside MySqlStreamingChangeEventSource
        LOG.info(
                "Creating event filter that dropping row mutation events before 
timestamp in second {}",
                startTimestampSec);
        return event -> {
            if (!EventType.isRowMutation(getEventType(event))) {
                return true;
            }
            return event.getHeader().getTimestamp() >= startTimestampSec * 1000;
        };
    }
    return event -> true;
}

```
In this scenario, the user configures a ⁠startOption with timestamp mode (which 
may be the default behavior, tied to the Flink job’s submission time) and 
restarts the Flink CDC job from a savepoint. If the savepoint contains MySQL 
binlog information including a ⁠ts_sec value, and that ⁠ts_sec is earlier than 
the user-defined timestamp, data loss may occur.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to