[ 
https://issues.apache.org/jira/browse/FLINK-39198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-39198:
-----------------------------------
    Labels: pull-request-available  (was: )

> Fix the issue of potential data loss during recovery from a checkpoint.
> -----------------------------------------------------------------------
>
>                 Key: FLINK-39198
>                 URL: https://issues.apache.org/jira/browse/FLINK-39198
>             Project: Flink
>          Issue Type: Improvement
>          Components: Flink CDC
>    Affects Versions: cdc-3.2.0, cdc-3.1.1, cdc-3.3.0, cdc-3.4.0, cdc-3.5.0
>         Environment: Flink cdc version : all version 
> Flink version: 1.17.1
>  
>            Reporter: Hao Yu
>            Priority: Blocker
>              Labels: pull-request-available
>         Attachments: image-2026-03-03-17-46-56-384.png
>
>
> Flink CDC creates an eventFilter inside the MySqlBinlogSplitReadTask.  
> This filter is produced through the following method. In timestamp 
> startOptions mode, the eventFilter will exclude events whose timestamps are 
> earlier than the user-defined timestamp. However, if the job resumes from a 
> savepoint or checkpoint and the system automatically sets the timestamp 
> parameter, this approach can result in data loss.
>  
> {code:java}
> private Predicate<Event> createEventFilter() {
>     // If the startup mode is set as TIMESTAMP, we need to apply a filter on 
> event to drop
>     // events earlier than the specified timestamp.
>     // NOTE: Here we take user's configuration 
> (statefulTaskContext.getSourceConfig())
>     // as the ground truth. This might be fragile if user changes the config 
> and recover
>     // the job from savepoint / checkpoint, as there might be conflict 
> between user's config
>     // and the state in savepoint / checkpoint. But as we don't promise 
> compatibility of
>     // checkpoint after changing the config, this is acceptable for now.
>     StartupOptions startupOptions = 
> statefulTaskContext.getSourceConfig().getStartupOptions();
>     if (startupOptions.startupMode.equals(StartupMode.TIMESTAMP)) {
>         if (startupOptions.binlogOffset == null)
> {             throw new NullPointerException(                     "The 
> startup option was set to TIMESTAMP "                             + "but 
> unable to find starting binlog offset. Please check if the timestamp is 
> specified in "                             + "configuration. ");         }
>         long startTimestampSec = 
> startupOptions.binlogOffset.getTimestampSec();
>         // We only skip data change event, as other kinds of events are 
> necessary for updating
>         // some internal state inside MySqlStreamingChangeEventSource
>         LOG.info(
>                 "Creating event filter that dropping row mutation events 
> before timestamp in second {}",
>                 startTimestampSec);
>         return event -> {
>             if (!EventType.isRowMutation(getEventType(event)))
> {                 return true;             }
>             return event.getHeader().getTimestamp() >= startTimestampSec * 
> 1000;
>         };
>     }
>     return event -> true;
> }
> {code}
> !image-2026-03-03-17-46-56-384.png!
>  
> In this scenario, the user configures a ⁠startOption with timestamp mode 
> (which may be the default behavior, tied to the Flink job’s submission time) 
> and restarts the Flink CDC job from a savepoint. If the savepoint contains 
> MySQL binlog information including a ⁠ts_sec value, and that ⁠ts_sec is 
> earlier than the user-defined timestamp, data loss may occur.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to