Hao Yu created FLINK-39198:
------------------------------
Summary: Fix the issue of potential data loss during recovery from
a checkpoint.
Key: FLINK-39198
URL: https://issues.apache.org/jira/browse/FLINK-39198
Project: Flink
Issue Type: Improvement
Components: Flink CDC
Affects Versions: cdc-3.5.0, cdc-3.4.0, cdc-3.3.0, cdc-3.1.1, cdc-3.2.0
Environment: Flink cdc version : all version
Flink version: 1.17.1
Reporter: Hao Yu
Flink CDC creates an eventFilter inside the MySqlBinlogSplitReadTask.
This filter is produced through the following method. In timestamp startOptions
mode, the eventFilter will exclude events whose timestamps are earlier than the
user-defined timestamp. However, if the job resumes from a savepoint or
checkpoint and the system automatically sets the timestamp parameter, this
approach can result in data loss.
```java
private Predicate<Event> createEventFilter() {
// If the startup mode is set as TIMESTAMP, we need to apply a filter on
event to drop
// events earlier than the specified timestamp.
// NOTE: Here we take user's configuration
(statefulTaskContext.getSourceConfig())
// as the ground truth. This might be fragile if user changes the config
and recover
// the job from savepoint / checkpoint, as there might be conflict between
user's config
// and the state in savepoint / checkpoint. But as we don't promise
compatibility of
// checkpoint after changing the config, this is acceptable for now.
StartupOptions startupOptions =
statefulTaskContext.getSourceConfig().getStartupOptions();
if (startupOptions.startupMode.equals(StartupMode.TIMESTAMP)) {
if (startupOptions.binlogOffset == null) {
throw new NullPointerException(
"The startup option was set to TIMESTAMP "
+ "but unable to find starting binlog offset.
Please check if the timestamp is specified in "
+ "configuration. ");
}
long startTimestampSec = startupOptions.binlogOffset.getTimestampSec();
// We only skip data change event, as other kinds of events are
necessary for updating
// some internal state inside MySqlStreamingChangeEventSource
LOG.info(
"Creating event filter that dropping row mutation events before
timestamp in second {}",
startTimestampSec);
return event -> {
if (!EventType.isRowMutation(getEventType(event))) {
return true;
}
return event.getHeader().getTimestamp() >= startTimestampSec * 1000;
};
}
return event -> true;
}
```
In this scenario, the user configures a startOption with timestamp mode (which
may be the default behavior, tied to the Flink job’s submission time) and
restarts the Flink CDC job from a savepoint. If the savepoint contains MySQL
binlog information including a ts_sec value, and that ts_sec is earlier than
the user-defined timestamp, data loss may occur.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)