Thanks Yun for your inputs. Yes, increasing checkpoint helps and we are able to save save points now. In our case we wanted to increase parallelism so I believe savepoint is the only option as checkpoint doesn't support code/parallelism changes.
Gagan On Wed, Oct 31, 2018 at 8:46 PM Yun Tang <myas...@live.com> wrote: > Hi Gagan > > Savepoint would generally takes more time than usual incremental > checkpoint, you could try to increase checkpoint timeout time [1] > > env.getCheckpointConfig().setCheckpointTimeout(900000); > > If you just want to resume from previous job without change the > state-backend, I think you could also try to resume from a retained > checkpoint without trigger savepoint [2]. > > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/checkpointing.html#enabling-and-configuring-checkpointing > > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint > Apache Flink 1.6 Documentation: Checkpoints > <https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint> > Deployment & Operations; State & Fault Tolerance; Checkpoints; > Checkpoints. Overview; Retained Checkpoints. Directory Structure; > Difference to Savepoints; Resuming from a retained checkpoint > ci.apache.org > > Best > Yun Tang > > ------------------------------ > *From:* Gagan Agrawal <agrawalga...@gmail.com> > *Sent:* Wednesday, October 31, 2018 19:03 > *To:* happydexu...@gmail.com > *Cc:* user@flink.apache.org > *Subject:* Re: Savepoint failed with error "Checkpoint expired before > completing" > > Hi Henry, > Thanks for your response. However we don't face this issue during normal > run as we have incremental checkpoints. Only when we try to take savepoint > (which tries to save entire state in one go), we face this problem. > > Gagan > > On Wed, Oct 31, 2018 at 11:41 AM 徐涛 <happydexu...@gmail.com> wrote: > > Hi Gagan, > I have met with the error the checkpoint timeout too. > In my case, it is not due to big checkpoint size, but due to slow > sink then cause high backpressure to the upper operator. Then the barrier > may take a long time to arrive to sink. > Please check if it is the case you have met. > > Best > Henry > > > 在 2018年10月30日,下午6:07,Gagan Agrawal <agrawalga...@gmail.com> 写道: > > > > Hi, > > We have a flink job (flink version 1.6.1) which unions 2 streams to pass > through custom KeyedProcessFunction with RocksDB state store which final > creates another stream into Kafka. Current size of checkpoint is around > ~100GB and checkpoints are saved to s3 with 5 mins interval and incremental > checkpoint enabled. Checkpoints mostly finish in less than 1 min. We are > running this job on yarn with following parameters > > > > -yn 10 (10 task managers) > > -ytm 2048 (2 GB each) > > - Operator parallelism is also 10. > > > > While trying to run savepoint on this job, it runs for ~10mins and then > throws following error. Looks like checkpoint default timeout of 10mins is > causing this. What is recommended way to run savepoint for such job? Should > we increase checkpoint default timeout of 10mins? Also currently our state > size is 100GB but it is expected to grow unto 1TB. Is flink good for > usecases with that much of size? Also how much time savepoint is expected > to take with such state size and parallelism on Yarn? Any other > recommendation would be of great help. > > > > org.apache.flink.util.FlinkException: Triggering a savepoint for the job > 434398968e635a49329f59a019b41b6f failed. > > at > org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:714) > > at > org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:692) > > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:979) > > at > org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:689) > > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1059) > > at > org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120) > > at java.security.AccessController.doPrivileged(Native Method) > > at javax.security.auth.Subject.doAs(Subject.java:422) > > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) > > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > > at > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120) > > Caused by: java.util.concurrent.CompletionException: > java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint > expired before completing > > at > org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$13(JobMaster.java:955) > > at > java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) > > at > java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) > > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > > at > org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortExpired(PendingCheckpoint.java:412) > > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$triggerCheckpoint$0(CheckpointCoordinator.java:548) > > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > at java.lang.Thread.run(Thread.java:748) > > Caused by: java.util.concurrent.CompletionException: > java.lang.Exception: Checkpoint expired before completing > > at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) > > at > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) > > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) > > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) > >