[ https://issues.apache.org/jira/browse/FLINK-7844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16206017#comment-16206017 ]
Till Rohrmann edited comment on FLINK-7844 at 10/16/17 3:23 PM: ---------------------------------------------------------------- Hi [~zhenzhongxu], thanks for the feedback. The checkpoint timeout is currently set to 10 minutes per default. You can control it via the {{CheckpointConfig}}. {code} StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getCheckpointConfig().setCheckpointTimeout(30000L); // 30s checkpoint timeout {code} It looks to me as if we don't fail the currently ongoing checkpoint in case of a fine grained recovery. We also don't resend the checkpoint barriers for the restarted component. Therefore, the checkpoint will never complete and the {{CheckpointCoordinator}} will wait until the checkpoint times out. The easiest solution should be to fail all currently ongoing checkpoints upon fine grained recovery. That's also what happens in the case of a global recovery. was (Author: till.rohrmann): Hi [~zhenzhongxu], thanks for the feedback. The checkpoint timeout is currently hardcoded to 10 minutes. We should definitely expose this to the user via a configuration option. Moreover, it looks to me as if we don't fail the currently ongoing checkpoint in case of a fine grained recovery. We also don't resend the checkpoint barriers for the restarted component. Therefore, the checkpoint will never complete and the {{CheckpointCoordinator}} will wait until the checkpoint times out. The easiest solution should be to fail all currently ongoing checkpoints upon fine grained recovery. That's also what happens in the case of a global recovery. I will split this issue into two issues. One adding the configuration option for the timeout and this one for aborting of all currently ongoing checkpoints. > Fine Grained Recovery triggers checkpoint timeout failure > --------------------------------------------------------- > > Key: FLINK-7844 > URL: https://issues.apache.org/jira/browse/FLINK-7844 > Project: Flink > Issue Type: Bug > Components: Core > Affects Versions: 1.3.2 > Reporter: Zhenzhong Xu > Attachments: screenshot-1.png > > > Context: > We are using "individual" failover (fine-grained) recovery strategy for our > embarrassingly parallel router use case. The topic has over 2000 partitions, > and parallelism is set to ~180 that dispatched to over 20 task managers with > around 180 slots. > Observations: > We've noticed after one task manager termination, even though the individual > recovery happens correctly, that the workload was re-dispatched to a new > available task manager instance. However, the checkpoint would take 10 mins > to eventually timeout, causing all other task managers not able to commit > checkpoints. In a worst-case scenario, if job got restarted for other reasons > (i.e. job manager termination), that would cause more messages to be > re-processed/duplicates compared to the job without fine-grained recovery > enabled. > I am suspecting that uber checkpoint was waiting for a previous checkpoint > that initiated by the old task manager and thus taking a long time to time > out. > Two questions: > 1. Is there a configuration that controls this checkpoint timeout? > 2. Is there any reason that when Job Manager realizes that Task Manager is > gone and workload is redispatched, it still need to wait for the checkpoint > initiated by the old task manager? > Checkpoint screenshot in attachments. -- This message was sent by Atlassian JIRA (v6.4.14#64029)