[ https://issues.apache.org/jira/browse/FLINK-19596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17216541#comment-17216541 ]
Jiayi Liao commented on FLINK-19596: ------------------------------------ [~yunta] I didn't know there's already a JIRA before. Thanks for the remind and you can re-assign it to me. I agree with [~zjwang] that it may not be a good solution to couple {{CompletedCheckpointStore.recover()}} with the types of failover. The best solution comes to my mind for now is the same as [~zjwang]'s solution, letting {{CompletedCheckpointStore}} recover itself if it's not been recovered yet, when {{CheckpointCoordinator}} or other components trying to access the states in {{CompletedCheckpointStore}}. Specifically, I'm going to add new interface {{boolean isCheckpointStoreInitialized()}} in {{CompletedCheckpointStore}} and allow different implementations based on its root component. > Do not recover CompletedCheckpointStore on each failover > -------------------------------------------------------- > > Key: FLINK-19596 > URL: https://issues.apache.org/jira/browse/FLINK-19596 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing > Affects Versions: 1.11.2 > Reporter: Jiayi Liao > Priority: Major > > {{completedCheckpointStore.recover()}} in > {{restoreLatestCheckpointedStateInternal}} could be a bottleneck on failover > because the {{CompletedCheckpointStore}} needs to load HDFS files to > instantialize the {{CompleteCheckpoint}} instances. > The impact is significant in our case below: > * Jobs with high parallelism (no shuffle) which transfer data from Kafka to > other filesystems. > * If a machine goes down, several containers and tens of tasks are affected, > which means the {{completedCheckpointStore.recover()}} would be called tens > of times since the tasks are not in a failover region. > And I notice there is a "TODO" in the source codes: > {code:java} > // Recover the checkpoints, TODO this could be done only when there is a new > leader, not on each recovery > completedCheckpointStore.recover(); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)