[
https://issues.apache.org/jira/browse/FLINK-39198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hao Yu updated FLINK-39198:
---------------------------
Description:
Flink CDC creates an eventFilter inside the MySqlBinlogSplitReadTask.
This filter is produced through the following method. In timestamp startOptions
mode, the eventFilter will exclude events whose timestamps are earlier than the
user-defined timestamp. However, if the job resumes from a savepoint or
checkpoint and the system automatically sets the timestamp parameter, this
approach can result in data loss.
{code:java}
private Predicate<Event> createEventFilter() {
// If the startup mode is set as TIMESTAMP, we need to apply a filter on
event to drop
// events earlier than the specified timestamp.
// NOTE: Here we take user's configuration
(statefulTaskContext.getSourceConfig())
// as the ground truth. This might be fragile if user changes the config
and recover
// the job from savepoint / checkpoint, as there might be conflict between
user's config
// and the state in savepoint / checkpoint. But as we don't promise
compatibility of
// checkpoint after changing the config, this is acceptable for now.
StartupOptions startupOptions =
statefulTaskContext.getSourceConfig().getStartupOptions();
if (startupOptions.startupMode.equals(StartupMode.TIMESTAMP)) {
if (startupOptions.binlogOffset == null)
{ throw new NullPointerException( "The startup
option was set to TIMESTAMP " + "but unable to find
starting binlog offset. Please check if the timestamp is specified in "
+ "configuration. "); }
long startTimestampSec = startupOptions.binlogOffset.getTimestampSec();
// We only skip data change event, as other kinds of events are
necessary for updating
// some internal state inside MySqlStreamingChangeEventSource
LOG.info(
"Creating event filter that dropping row mutation events before
timestamp in second {}",
startTimestampSec);
return event -> {
if (!EventType.isRowMutation(getEventType(event)))
{ return true; }
return event.getHeader().getTimestamp() >= startTimestampSec * 1000;
};
}
return event -> true;
}
{code}
!image-2026-03-03-17-46-56-384.png!
In this scenario, the user configures a startOption with timestamp mode (which
may be the default behavior, tied to the Flink job’s submission time) and
restarts the Flink CDC job from a savepoint. If the savepoint contains MySQL
binlog information including a ts_sec value, and that ts_sec is earlier than
the user-defined timestamp, data loss may occur.
was:
Flink CDC creates an eventFilter inside the MySqlBinlogSplitReadTask.
This filter is produced through the following method. In timestamp startOptions
mode, the eventFilter will exclude events whose timestamps are earlier than the
user-defined timestamp. However, if the job resumes from a savepoint or
checkpoint and the system automatically sets the timestamp parameter, this
approach can result in data loss.
{code:java}
private Predicate<Event> createEventFilter() {
// If the startup mode is set as TIMESTAMP, we need to apply a filter on
event to drop
// events earlier than the specified timestamp.
// NOTE: Here we take user's configuration
(statefulTaskContext.getSourceConfig())
// as the ground truth. This might be fragile if user changes the config
and recover
// the job from savepoint / checkpoint, as there might be conflict between
user's config
// and the state in savepoint / checkpoint. But as we don't promise
compatibility of
// checkpoint after changing the config, this is acceptable for now.
StartupOptions startupOptions =
statefulTaskContext.getSourceConfig().getStartupOptions();
if (startupOptions.startupMode.equals(StartupMode.TIMESTAMP)) {
if (startupOptions.binlogOffset == null)
{ throw new NullPointerException( "The startup
option was set to TIMESTAMP " + "but unable to find
starting binlog offset. Please check if the timestamp is specified in "
+ "configuration. "); }
long startTimestampSec = startupOptions.binlogOffset.getTimestampSec();
// We only skip data change event, as other kinds of events are
necessary for updating
// some internal state inside MySqlStreamingChangeEventSource
LOG.info(
"Creating event filter that dropping row mutation events before
timestamp in second {}",
startTimestampSec);
return event -> {
if (!EventType.isRowMutation(getEventType(event)))
{ return true; }
return event.getHeader().getTimestamp() >= startTimestampSec * 1000;
};
}
return event -> true;
}
{code}
In this scenario, the user configures a startOption with timestamp mode (which
may be the default behavior, tied to the Flink job’s submission time) and
restarts the Flink CDC job from a savepoint. If the savepoint contains MySQL
binlog information including a ts_sec value, and that ts_sec is earlier than
the user-defined timestamp, data loss may occur.
> Fix the issue of potential data loss during recovery from a checkpoint.
> -----------------------------------------------------------------------
>
> Key: FLINK-39198
> URL: https://issues.apache.org/jira/browse/FLINK-39198
> Project: Flink
> Issue Type: Improvement
> Components: Flink CDC
> Affects Versions: cdc-3.2.0, cdc-3.1.1, cdc-3.3.0, cdc-3.4.0, cdc-3.5.0
> Environment: Flink cdc version : all version
> Flink version: 1.17.1
>
> Reporter: Hao Yu
> Priority: Blocker
> Attachments: image-2026-03-03-17-46-56-384.png
>
>
> Flink CDC creates an eventFilter inside the MySqlBinlogSplitReadTask.
> This filter is produced through the following method. In timestamp
> startOptions mode, the eventFilter will exclude events whose timestamps are
> earlier than the user-defined timestamp. However, if the job resumes from a
> savepoint or checkpoint and the system automatically sets the timestamp
> parameter, this approach can result in data loss.
>
> {code:java}
> private Predicate<Event> createEventFilter() {
> // If the startup mode is set as TIMESTAMP, we need to apply a filter on
> event to drop
> // events earlier than the specified timestamp.
> // NOTE: Here we take user's configuration
> (statefulTaskContext.getSourceConfig())
> // as the ground truth. This might be fragile if user changes the config
> and recover
> // the job from savepoint / checkpoint, as there might be conflict
> between user's config
> // and the state in savepoint / checkpoint. But as we don't promise
> compatibility of
> // checkpoint after changing the config, this is acceptable for now.
> StartupOptions startupOptions =
> statefulTaskContext.getSourceConfig().getStartupOptions();
> if (startupOptions.startupMode.equals(StartupMode.TIMESTAMP)) {
> if (startupOptions.binlogOffset == null)
> { throw new NullPointerException( "The
> startup option was set to TIMESTAMP " + "but
> unable to find starting binlog offset. Please check if the timestamp is
> specified in " + "configuration. "); }
> long startTimestampSec =
> startupOptions.binlogOffset.getTimestampSec();
> // We only skip data change event, as other kinds of events are
> necessary for updating
> // some internal state inside MySqlStreamingChangeEventSource
> LOG.info(
> "Creating event filter that dropping row mutation events
> before timestamp in second {}",
> startTimestampSec);
> return event -> {
> if (!EventType.isRowMutation(getEventType(event)))
> { return true; }
> return event.getHeader().getTimestamp() >= startTimestampSec *
> 1000;
> };
> }
> return event -> true;
> }
> {code}
> !image-2026-03-03-17-46-56-384.png!
>
> In this scenario, the user configures a startOption with timestamp mode
> (which may be the default behavior, tied to the Flink job’s submission time)
> and restarts the Flink CDC job from a savepoint. If the savepoint contains
> MySQL binlog information including a ts_sec value, and that ts_sec is
> earlier than the user-defined timestamp, data loss may occur.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)