Fabian Paul created FLINK-29512: ----------------------------------- Summary: Align SubtaskCommittableManager checkpointId with CheckpointCommittableManagerImpl checkpointId during recovery Key: FLINK-29512 URL: https://issues.apache.org/jira/browse/FLINK-29512 Project: Flink Issue Type: Bug Components: Connectors / Common Affects Versions: 1.15.2, 1.17.0, 1.16.1 Reporter: Fabian Paul
Similar to the issue described in https://issues.apache.org/jira/browse/FLINK-29509 during the recovery of committables, the subtaskCommittables checkpointId is set to always 1 [https://github.com/apache/flink/blob/9152af41c2d401e5eacddee1bb10d1b6bea6c61a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java#L193] while the holding CheckpointCommittableManager is initialized with the checkpointId that is written into state [https://github.com/apache/flink/blob/9152af41c2d401e5eacddee1bb10d1b6bea6c61a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java#L155 .|https://github.com/apache/flink/blob/9152af41c2d401e5eacddee1bb10d1b6bea6c61a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java#L155.] This leads to that during a recovery, the post-commit topology will receive a committable summary with the recovered checkpoint id and multiple `CommittableWithLinage`s with the reset checkpointId causing orphaned `CommittableWithLinages` without a `CommittableSummary` failing the job. -- This message was sent by Atlassian Jira (v8.20.10#820010)