[
https://issues.apache.org/jira/browse/FLINK-39198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-39198:
-----------------------------------
Labels: pull-request-available (was: )
> Fix the issue of potential data loss during recovery from a checkpoint.
> -----------------------------------------------------------------------
>
> Key: FLINK-39198
> URL: https://issues.apache.org/jira/browse/FLINK-39198
> Project: Flink
> Issue Type: Improvement
> Components: Flink CDC
> Affects Versions: cdc-3.2.0, cdc-3.1.1, cdc-3.3.0, cdc-3.4.0, cdc-3.5.0
> Environment: Flink cdc version : all version
> Flink version: 1.17.1
>
> Reporter: Hao Yu
> Priority: Blocker
> Labels: pull-request-available
> Attachments: image-2026-03-03-17-46-56-384.png
>
>
> Flink CDC creates an eventFilter inside the MySqlBinlogSplitReadTask.
> This filter is produced through the following method. In timestamp
> startOptions mode, the eventFilter will exclude events whose timestamps are
> earlier than the user-defined timestamp. However, if the job resumes from a
> savepoint or checkpoint and the system automatically sets the timestamp
> parameter, this approach can result in data loss.
>
> {code:java}
> private Predicate<Event> createEventFilter() {
> // If the startup mode is set as TIMESTAMP, we need to apply a filter on
> event to drop
> // events earlier than the specified timestamp.
> // NOTE: Here we take user's configuration
> (statefulTaskContext.getSourceConfig())
> // as the ground truth. This might be fragile if user changes the config
> and recover
> // the job from savepoint / checkpoint, as there might be conflict
> between user's config
> // and the state in savepoint / checkpoint. But as we don't promise
> compatibility of
> // checkpoint after changing the config, this is acceptable for now.
> StartupOptions startupOptions =
> statefulTaskContext.getSourceConfig().getStartupOptions();
> if (startupOptions.startupMode.equals(StartupMode.TIMESTAMP)) {
> if (startupOptions.binlogOffset == null)
> { throw new NullPointerException( "The
> startup option was set to TIMESTAMP " + "but
> unable to find starting binlog offset. Please check if the timestamp is
> specified in " + "configuration. "); }
> long startTimestampSec =
> startupOptions.binlogOffset.getTimestampSec();
> // We only skip data change event, as other kinds of events are
> necessary for updating
> // some internal state inside MySqlStreamingChangeEventSource
> LOG.info(
> "Creating event filter that dropping row mutation events
> before timestamp in second {}",
> startTimestampSec);
> return event -> {
> if (!EventType.isRowMutation(getEventType(event)))
> { return true; }
> return event.getHeader().getTimestamp() >= startTimestampSec *
> 1000;
> };
> }
> return event -> true;
> }
> {code}
> !image-2026-03-03-17-46-56-384.png!
>
> In this scenario, the user configures a startOption with timestamp mode
> (which may be the default behavior, tied to the Flink job’s submission time)
> and restarts the Flink CDC job from a savepoint. If the savepoint contains
> MySQL binlog information including a ts_sec value, and that ts_sec is
> earlier than the user-defined timestamp, data loss may occur.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)