Fabian Paul created FLINK-29512:
-----------------------------------

             Summary: Align SubtaskCommittableManager checkpointId with 
CheckpointCommittableManagerImpl checkpointId during recovery
                 Key: FLINK-29512
                 URL: https://issues.apache.org/jira/browse/FLINK-29512
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Common
    Affects Versions: 1.15.2, 1.17.0, 1.16.1
            Reporter: Fabian Paul


Similar to the issue described in 
https://issues.apache.org/jira/browse/FLINK-29509 during the recovery of 
committables, the subtaskCommittables checkpointId is set to always 1 
[https://github.com/apache/flink/blob/9152af41c2d401e5eacddee1bb10d1b6bea6c61a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java#L193]
 while the holding CheckpointCommittableManager is initialized with the 
checkpointId that is written into state 
[https://github.com/apache/flink/blob/9152af41c2d401e5eacddee1bb10d1b6bea6c61a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java#L155
 
.|https://github.com/apache/flink/blob/9152af41c2d401e5eacddee1bb10d1b6bea6c61a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java#L155.]

 

This leads to that during a recovery, the post-commit topology will receive a 
committable summary with the recovered checkpoint id and multiple 
`CommittableWithLinage`s with the reset checkpointId causing orphaned 
`CommittableWithLinages` without a `CommittableSummary` failing the job.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to