Thanks for the tip! I did change the jobGraph this time. Hao Sun Team Lead 1019 Market St. 7F San Francisco, CA 94103
On Thu, Dec 6, 2018 at 2:47 AM Till Rohrmann <trohrm...@apache.org> wrote: > Hi Hao, > > if Flink tries to recover from a checkpoint, then the JobGraph should not > be modified and the system should be able to restore the state. > > Have you changed the JobGraph and are you now trying to recover from the > latest checkpoint which is stored in ZooKeeper? If so, then you can also > start the job cluster with a different cluster id and manually pass the > path to the latest checkpoint as the savepoint path to resume from. By > specifying a new cluster id, the system will create a new ZNode in > ZooKeeper and don't use the checkpoints from the previous run. > > If you did not change the JobGraph, then this sounds like a bug. For > further investigation the debug log files would be helpful. > > Cheers, > Till > > On Wed, Dec 5, 2018 at 7:18 PM Hao Sun <ha...@zendesk.com> wrote: > >> Till, Flink is automatically trying to recover from a checkpoint not >> savepoint. How can I get allowNonRestoredState applied in this case? >> >> Hao Sun >> Team Lead >> 1019 Market St. 7F >> San Francisco, CA 94103 >> >> >> On Wed, Dec 5, 2018 at 10:09 AM Till Rohrmann <trohrm...@apache.org> >> wrote: >> >>> Hi Hao, >>> >>> I think you need to provide a savepoint file via --fromSavepoint to >>> resume from in order to specify --allowNonRestoredState. Otherwise this >>> option will be ignored because it only works if you resume from a savepoint. >>> >>> Cheers, >>> Till >>> >>> On Wed, Dec 5, 2018 at 12:29 AM Hao Sun <ha...@zendesk.com> wrote: >>> >>>> I am using 1.7 and job cluster on k8s. >>>> >>>> Here is how I start my job >>>> ==== >>>> docker-entrypoint.sh job-cluster -j >>>> com.zendesk.fraud_prevention.examples.ConnectedStreams >>>> --allowNonRestoredState >>>> ==== >>>> >>>> *Seems like --allowNonRestoredState is not honored* >>>> >>>> === Logs === >>>> java","line":"1041","message":"Restoring job >>>> 00000000000000000000000000000000 from latest valid checkpoint: Checkpoint >>>> 8103 @ 0 for 00000000000000000000000000000000."} >>>> {"timestamp":"2018-12-04 >>>> 23:19:39,859","level":"ERROR","thread":"flink-akka.actor.default-dispatcher-15","file":"ClusterEntrypoint.java","line":"390","message":"Fatal >>>> error occurred in the cluster entrypoint."} >>>> java.lang.RuntimeException: >>>> org.apache.flink.runtime.client.JobExecutionException: Could not set up >>>> JobManager >>>> at >>>> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36) >>>> at >>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) >>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) >>>> at >>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) >>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>> at >>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>> at >>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could >>>> not set up JobManager >>>> at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>( >>>> http://JobManagerRunner.java:176 >>>> <http://JobManagerRunner.java:176>) >>>> at >>>> org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058) >>>> at >>>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308) >>>> at >>>> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34) >>>> ... 7 more >>>> Caused by: java.lang.IllegalStateException: There is no operator for >>>> the state 2f4bc854a18755730e14a90e1d4d7c7d >>>> at >>>> org.apache.flink.runtime.checkpoint.StateAssignmentOperation.checkStateMappingCompleteness(StateAssignmentOperation.java:569) >>>> at >>>> org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:77) >>>> at >>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedState(CheckpointCoordinator.java:1049) >>>> at >>>> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1152) >>>> at org.apache.flink.runtime.jobmaster.JobMaster.<init>( >>>> http://JobMaster.java:296 >>>> <http://JobMaster.java:296>) >>>> at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>( >>>> http://JobManagerRunner.java:157 >>>> <http://JobManagerRunner.java:157>) >>>> ====== >>>> >>>> Can somebody help out? Thanks >>>> >>>> Hao Sun >>>> >>>