[ https://issues.apache.org/jira/browse/FLINK-25267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kerem Ulutaş updated FLINK-25267: --------------------------------- Description: My Stateful Functions job is running on Kubernetes (minikube on my local env) and has these settings: * Using StateFun v3.1.0 * Checkpoints are stored on HDFS (state.checkpoint-storage: filesystem) * Checkpointing mode is EXACTLY_ONCE * State backend is rocksdb and incremental checkpointing is enabled When I kill the jobmanager (master) pod, minikube starts another pod and this new pod fails when it tries to load last checkpoint: {code:java} ... 2021-12-11 14:25:26,426 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Initializing job myStatefunApp (00000000000000000000000000000000). 2021-12-11 14:25:26,443 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Using restart back off time strategy FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647, backoffTimeMS=1000) for myStatefunApp (00000000000000000000000000000000). 2021-12-11 14:25:26,516 INFO org.apache.flink.runtime.util.ZooKeeperUtils [] - Initialized DefaultCompletedCheckpointStore in 'ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}' with /checkpoints/00000000000000000000000000000000. 2021-12-11 14:25:26,599 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Running initialization on master for job myStatefunApp (00000000000000000000000000000000). 2021-12-11 14:25:26,599 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Successfully ran initialization on master in 0 ms. 2021-12-11 14:25:26,617 INFO org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built 1 pipelined regions in 1 ms 2021-12-11 14:25:26,626 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Using job/cluster config to configure application-defined state backend: EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null, enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=1, writeBatchSize=2097152} 2021-12-11 14:25:26,627 INFO org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Using predefined options: DEFAULT. 2021-12-11 14:25:26,627 INFO org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Using application-defined options factory: DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=1}}. 2021-12-11 14:25:26,627 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Using application-defined state backend: EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null, enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=1, writeBatchSize=2097152} 2021-12-11 14:25:26,631 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Checkpoint storage is set to 'filesystem': (checkpoints "hdfs://hdfs-namenode:8020/tmp/statefun_checkpoints/myStatefunApp") 2021-12-11 14:25:26,712 INFO org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Recovering checkpoints from ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}. 2021-12-11 14:25:26,724 INFO org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Found 1 checkpoints in ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}. 2021-12-11 14:25:26,725 INFO org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Trying to fetch 1 checkpoints from storage. 2021-12-11 14:25:26,725 INFO org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Trying to retrieve checkpoint 2. 2021-12-11 14:25:26,931 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring job 00000000000000000000000000000000 from Checkpoint 2 @ 1639232587220 for 00000000000000000000000000000000 located at hdfs://hdfs-namenode:8020/tmp/statefun_checkpoints/myStatefunApp/00000000000000000000000000000000/chk-2. 2021-12-11 14:25:27,012 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error occurred in the cluster entrypoint. org.apache.flink.util.FlinkException: JobMaster for job 00000000000000000000000000000000 failed. at org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:873) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.runtime.dispatcher.Dispatcher.jobManagerRunnerFailed(Dispatcher.java:459) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.runtime.dispatcher.Dispatcher.handleJobManagerRunnerResult(Dispatcher.java:436) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:415) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at java.util.concurrent.CompletableFuture.uniHandle(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture$Completion.run(Unknown Source) ~[?:?] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.12-1.13.2.jar:1.13.2] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.12-1.13.2.jar:1.13.2] at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-dist_2.12-1.13.2.jar:1.13.2] at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-dist_2.12-1.13.2.jar:1.13.2] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.12-1.13.2.jar:1.13.2] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.12-1.13.2.jar:1.13.2] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.13.2.jar:1.13.2] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.13.2.jar:1.13.2] at akka.actor.Actor.aroundReceive(Actor.scala:517) [flink-dist_2.12-1.13.2.jar:1.13.2] at akka.actor.Actor.aroundReceive$(Actor.scala:515) [flink-dist_2.12-1.13.2.jar:1.13.2] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.12-1.13.2.jar:1.13.2] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.12-1.13.2.jar:1.13.2] at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.12-1.13.2.jar:1.13.2] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.12-1.13.2.jar:1.13.2] at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.12-1.13.2.jar:1.13.2] at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.12-1.13.2.jar:1.13.2] at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.13.2.jar:1.13.2] at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.13.2.jar:1.13.2] at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.13.2.jar:1.13.2] at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.13.2.jar:1.13.2] Caused by: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster. at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) ~[?:?] at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?] at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) ~[?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?] at java.lang.Thread.run(Unknown Source) ~[?:?] Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: There is no operator for the state 18666b435c78ee2416e74bb997b798a7 at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) ~[?:?] at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?] at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) ~[?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?] at java.lang.Thread.run(Unknown Source) ~[?:?] Caused by: java.lang.IllegalStateException: There is no operator for the state 18666b435c78ee2416e74bb997b798a7 at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.checkStateMappingCompleteness(StateAssignmentOperation.java:712) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:100) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1562) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1476) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:134) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:190) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:122) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:317) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) ~[?:?] at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?] at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) ~[?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?] at java.lang.Thread.run(Unknown Source) ~[?:?] 2021-12-11 14:25:27,017 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting StatefulFunctionsClusterEntryPoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally.. 2021-12-11 14:25:27,021 INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting down rest endpoint. 2021-12-11 14:25:27,025 INFO org.apache.flink.runtime.blob.BlobServer [] - Stopped BLOB server at 0.0.0.0:6124 2021-12-11 14:25:27,034 INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Removing cache directory /tmp/flink-web-6c2dafc9-bb7d-489a-9e2d-cf78e3f19b67/flink-web-ui 2021-12-11 14:25:27,035 INFO org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Stopping DefaultLeaderElectionService. 2021-12-11 14:25:27,035 INFO org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - Closing ZooKeeperLeaderElectionDriver{leaderPath='/leader/rest_server_lock'} 2021-12-11 14:25:27,036 INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shut down complete. 2021-12-11 14:25:27,036 INFO org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent [] - Closing components. 2021-12-11 14:25:27,037 INFO org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Stopping DefaultLeaderRetrievalService. 2021-12-11 14:25:27,037 INFO org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] - Closing ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/dispatcher_lock'}. 2021-12-11 14:25:27,037 INFO org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Stopping DefaultLeaderRetrievalService. 2021-12-11 14:25:27,037 INFO org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] - Closing ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/resource_manager_lock'}. 2021-12-11 14:25:27,038 INFO org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Stopping DefaultLeaderElectionService. 2021-12-11 14:25:27,038 INFO org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - Closing ZooKeeperLeaderElectionDriver{leaderPath='/leader/dispatcher_lock'} 2021-12-11 14:25:27,039 INFO org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcess [] - Stopping JobDispatcherLeaderProcess. 2021-12-11 14:25:27,040 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Closing the slot manager. 2021-12-11 14:25:27,040 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Suspending the slot manager. 2021-12-11 14:25:27,041 INFO org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Stopping DefaultLeaderElectionService. 2021-12-11 14:25:27,041 INFO org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - Closing ZooKeeperLeaderElectionDriver{leaderPath='/leader/resource_manager_lock'} {code} But somehow, among several restarts, jobmanager can randomly restore job from the last checkpoint. After I changed log level of Flink to DEBUG, I've managed to get the difference between an unsuccessful (resulting in above log) and a successful sequence of events. It seems that operators can get assigned different hashes between restarts, here is the relevant log section for the unsucessful assignment (renamed my operators for clarity): {code:java} 2021-12-11 21:55:14,001 DEBUG org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated hash '32d5ca33c915e65563a5c7f4d62703ad' for node 'router (my-ingress-1-in)-5' {id: 5, parallelism: 1, user function: } 2021-12-11 21:55:14,001 DEBUG org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated hash '33b86fe798648d648b237ddfc986200d' for node 'router (my-ingress-2-in)-4' {id: 4, parallelism: 1, user function: } 2021-12-11 21:55:14,001 DEBUG org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated hash 'bd4c3fa1570bbcf606f2dabddd61ed7f' for node 'router (my-ingress-3-in)-6' {id: 6, parallelism: 1, user function: } {code} .. and here is the same log section for the successful assignment: {code:java} 2021-12-11 21:55:34,543 DEBUG org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated hash 'a1448ecf31ac98d2215c38bfd119abe0' for node 'router (my-ingress-3-in)-5' {id: 5, parallelism: 1, user function: } 2021-12-11 21:55:34,543 DEBUG org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated hash '05037ff96baea131d9cf1390846efd98' for node 'router (my-ingress-1-in)-4' {id: 4, parallelism: 1, user function: } 2021-12-11 21:55:34,543 DEBUG org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated hash '18666b435c78ee2416e74bb997b798a7' for node 'router (my-ingress-2-in)-6' {id: 6, parallelism: 1, user function: } {code} As you can see, the hash "18666b435c78ee2416e74bb997b798a7" is generated and jobmanager could match the operator for the state loaded from the checkpoint and it could continue normal operation. Another thing to note is, router operators have different ids assigned between the 2 runs. I took a look at StreamGraphHasherV2 code ([link|#L65]) there is an explicit attempt to have the operator order the same between different runs, however my Stateful Functions application seems to be able to avoid that attempt. Since we can't assign operator ids when using Stateful Functions, is there anything I can do right to get it working correctly? Is this a bug, or am I trying it with a wrong combination of settings or something like that? As a last note, I've also posted the same earlier to Stack Overflow, here is the [link|https://stackoverflow.com/questions/70316498/flink-statefun-high-availability-exception-java-lang-illegalstateexception-th] to the question. Thanks was: My Stateful Functions job is running on Kubernetes (minikube on my local env) and has these settings: * Using StateFun v3.1.0 * Checkpoints are stored on HDFS (state.checkpoint-storage: filesystem) * Checkpointing mode is EXACTLY_ONCE * State backend is rocksdb and incremental checkpointing is enabled When I kill the jobmanager (master) pod, minikube starts another pod and this new pod fails when it tries to load last checkpoint: {code:java} ... 2021-12-11 14:25:26,426 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Initializing job myStatefunApp (00000000000000000000000000000000). 2021-12-11 14:25:26,443 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Using restart back off time strategy FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647, backoffTimeMS=1000) for myStatefunApp (00000000000000000000000000000000). 2021-12-11 14:25:26,516 INFO org.apache.flink.runtime.util.ZooKeeperUtils [] - Initialized DefaultCompletedCheckpointStore in 'ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}' with /checkpoints/00000000000000000000000000000000. 2021-12-11 14:25:26,599 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Running initialization on master for job myStatefunApp (00000000000000000000000000000000). 2021-12-11 14:25:26,599 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Successfully ran initialization on master in 0 ms. 2021-12-11 14:25:26,617 INFO org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built 1 pipelined regions in 1 ms 2021-12-11 14:25:26,626 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Using job/cluster config to configure application-defined state backend: EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null, enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=1, writeBatchSize=2097152} 2021-12-11 14:25:26,627 INFO org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Using predefined options: DEFAULT. 2021-12-11 14:25:26,627 INFO org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Using application-defined options factory: DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=1}}. 2021-12-11 14:25:26,627 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Using application-defined state backend: EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null, enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=1, writeBatchSize=2097152} 2021-12-11 14:25:26,631 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Checkpoint storage is set to 'filesystem': (checkpoints "hdfs://hdfs-namenode:8020/tmp/statefun_checkpoints/myStatefunApp") 2021-12-11 14:25:26,712 INFO org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Recovering checkpoints from ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}. 2021-12-11 14:25:26,724 INFO org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Found 1 checkpoints in ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}. 2021-12-11 14:25:26,725 INFO org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Trying to fetch 1 checkpoints from storage. 2021-12-11 14:25:26,725 INFO org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Trying to retrieve checkpoint 2. 2021-12-11 14:25:26,931 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring job 00000000000000000000000000000000 from Checkpoint 2 @ 1639232587220 for 00000000000000000000000000000000 located at hdfs://hdfs-namenode:8020/tmp/statefun_checkpoints/myStatefunApp/00000000000000000000000000000000/chk-2. 2021-12-11 14:25:27,012 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error occurred in the cluster entrypoint. org.apache.flink.util.FlinkException: JobMaster for job 00000000000000000000000000000000 failed. at org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:873) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.runtime.dispatcher.Dispatcher.jobManagerRunnerFailed(Dispatcher.java:459) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.runtime.dispatcher.Dispatcher.handleJobManagerRunnerResult(Dispatcher.java:436) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:415) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at java.util.concurrent.CompletableFuture.uniHandle(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture$Completion.run(Unknown Source) ~[?:?] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.12-1.13.2.jar:1.13.2] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.12-1.13.2.jar:1.13.2] at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-dist_2.12-1.13.2.jar:1.13.2] at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-dist_2.12-1.13.2.jar:1.13.2] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.12-1.13.2.jar:1.13.2] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.12-1.13.2.jar:1.13.2] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.13.2.jar:1.13.2] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.13.2.jar:1.13.2] at akka.actor.Actor.aroundReceive(Actor.scala:517) [flink-dist_2.12-1.13.2.jar:1.13.2] at akka.actor.Actor.aroundReceive$(Actor.scala:515) [flink-dist_2.12-1.13.2.jar:1.13.2] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.12-1.13.2.jar:1.13.2] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.12-1.13.2.jar:1.13.2] at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.12-1.13.2.jar:1.13.2] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.12-1.13.2.jar:1.13.2] at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.12-1.13.2.jar:1.13.2] at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.12-1.13.2.jar:1.13.2] at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.13.2.jar:1.13.2] at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.13.2.jar:1.13.2] at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.13.2.jar:1.13.2] at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.13.2.jar:1.13.2] Caused by: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster. at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) ~[?:?] at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?] at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) ~[?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?] at java.lang.Thread.run(Unknown Source) ~[?:?] Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: There is no operator for the state 18666b435c78ee2416e74bb997b798a7 at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) ~[?:?] at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?] at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) ~[?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?] at java.lang.Thread.run(Unknown Source) ~[?:?] Caused by: java.lang.IllegalStateException: There is no operator for the state 18666b435c78ee2416e74bb997b798a7 at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.checkStateMappingCompleteness(StateAssignmentOperation.java:712) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:100) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1562) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1476) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:134) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:190) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:122) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:317) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) ~[?:?] at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?] at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) ~[?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?] at java.lang.Thread.run(Unknown Source) ~[?:?] 2021-12-11 14:25:27,017 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting StatefulFunctionsClusterEntryPoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally.. 2021-12-11 14:25:27,021 INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting down rest endpoint. 2021-12-11 14:25:27,025 INFO org.apache.flink.runtime.blob.BlobServer [] - Stopped BLOB server at 0.0.0.0:6124 2021-12-11 14:25:27,034 INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Removing cache directory /tmp/flink-web-6c2dafc9-bb7d-489a-9e2d-cf78e3f19b67/flink-web-ui 2021-12-11 14:25:27,035 INFO org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Stopping DefaultLeaderElectionService. 2021-12-11 14:25:27,035 INFO org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - Closing ZooKeeperLeaderElectionDriver{leaderPath='/leader/rest_server_lock'} 2021-12-11 14:25:27,036 INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shut down complete. 2021-12-11 14:25:27,036 INFO org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent [] - Closing components. 2021-12-11 14:25:27,037 INFO org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Stopping DefaultLeaderRetrievalService. 2021-12-11 14:25:27,037 INFO org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] - Closing ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/dispatcher_lock'}. 2021-12-11 14:25:27,037 INFO org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Stopping DefaultLeaderRetrievalService. 2021-12-11 14:25:27,037 INFO org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] - Closing ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/resource_manager_lock'}. 2021-12-11 14:25:27,038 INFO org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Stopping DefaultLeaderElectionService. 2021-12-11 14:25:27,038 INFO org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - Closing ZooKeeperLeaderElectionDriver{leaderPath='/leader/dispatcher_lock'} 2021-12-11 14:25:27,039 INFO org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcess [] - Stopping JobDispatcherLeaderProcess. 2021-12-11 14:25:27,040 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Closing the slot manager. 2021-12-11 14:25:27,040 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Suspending the slot manager. 2021-12-11 14:25:27,041 INFO org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Stopping DefaultLeaderElectionService. 2021-12-11 14:25:27,041 INFO org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - Closing ZooKeeperLeaderElectionDriver{leaderPath='/leader/resource_manager_lock'} {code} But somehow, among several restarts, jobmanager can randomly restore job from the last checkpoint. After I changed log level of Flink to DEBUG, I've managed to get the difference between an unsuccessful (resulting in above log) and a successful sequence of events. It seems that operators can get assigned different hashes between restarts, here is the relevant log section for the unsucessful assignment (renamed my operators for clarity): {code:java} 2021-12-11 21:55:14,001 DEBUG org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated hash '32d5ca33c915e65563a5c7f4d62703ad' for node 'router (my-ingress-1-in)-5' {id: 5, parallelism: 1, user function: } 2021-12-11 21:55:14,001 DEBUG org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated hash '33b86fe798648d648b237ddfc986200d' for node 'router (my-ingress-2-in)-4' {id: 4, parallelism: 1, user function: } 2021-12-11 21:55:14,001 DEBUG org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated hash 'bd4c3fa1570bbcf606f2dabddd61ed7f' for node 'router (my-ingress-3-in)-6' {id: 6, parallelism: 1, user function: } {code} .. and here is the same log section for the successful assignment: {code:java} 2021-12-11 21:55:34,543 DEBUG org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated hash 'a1448ecf31ac98d2215c38bfd119abe0' for node 'router (my-ingress-3-in)-5' {id: 5, parallelism: 1, user function: } 2021-12-11 21:55:34,543 DEBUG org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated hash '05037ff96baea131d9cf1390846efd98' for node 'router (my-ingress-1-in)-4' {id: 4, parallelism: 1, user function: } 2021-12-11 21:55:34,543 DEBUG org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated hash '18666b435c78ee2416e74bb997b798a7' for node 'router (my-ingress-2-in)-6' {id: 6, parallelism: 1, user function: } {code} As you can see, the hash "18666b435c78ee2416e74bb997b798a7" is generated and jobmanager could match the operator for the state loaded from the checkpoint and it could continue normal operation. Another thing to note is, router operators have different ids assigned between the 2 runs. I took a look at StreamGraphHasherV2 code ([link|#L65]) there is an explicit attempt to have the operator order the same between different attempts, however my Stateful Functions application seems to be able to avoid that attempt. Since we can't assign operator ids when using Stateful Functions, is there anything I can do right to get it working correctly? Is this a bug, or am I trying it with a wrong combination of settings or something like that? As a last note, I've also posted the same earlier to Stack Overflow, here is the [link|https://stackoverflow.com/questions/70316498/flink-statefun-high-availability-exception-java-lang-illegalstateexception-th] to the question. Thanks > Unable to (always) recover using checkpoint in HA setup (both Zookeeper and > Kubernetes) > --------------------------------------------------------------------------------------- > > Key: FLINK-25267 > URL: https://issues.apache.org/jira/browse/FLINK-25267 > Project: Flink > Issue Type: Bug > Components: Deployment / Kubernetes, Stateful Functions > Affects Versions: 1.12.1, statefun-3.0.0, statefun-3.1.0, 1.13.2 > Environment: MacOS 11.6, minikube v1.23.2, tried with both Stateful > Functions 3.0.0 and Stateful Functions 3.1.0 > Reporter: Kerem Ulutaş > Priority: Major > > My Stateful Functions job is running on Kubernetes (minikube on my local env) > and has these settings: > * Using StateFun v3.1.0 > * Checkpoints are stored on HDFS (state.checkpoint-storage: filesystem) > * Checkpointing mode is EXACTLY_ONCE > * State backend is rocksdb and incremental checkpointing is enabled > When I kill the jobmanager (master) pod, minikube starts another pod and this > new pod fails when it tries to load last checkpoint: > > {code:java} > ... > 2021-12-11 14:25:26,426 INFO org.apache.flink.runtime.jobmaster.JobMaster > [] - Initializing job myStatefunApp > (00000000000000000000000000000000). > 2021-12-11 14:25:26,443 INFO org.apache.flink.runtime.jobmaster.JobMaster > [] - Using restart back off time strategy > FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647, > backoffTimeMS=1000) for myStatefunApp (00000000000000000000000000000000). > 2021-12-11 14:25:26,516 INFO org.apache.flink.runtime.util.ZooKeeperUtils > [] - Initialized DefaultCompletedCheckpointStore in > 'ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}' > with /checkpoints/00000000000000000000000000000000. > 2021-12-11 14:25:26,599 INFO org.apache.flink.runtime.jobmaster.JobMaster > [] - Running initialization on master for job myStatefunApp > (00000000000000000000000000000000). > 2021-12-11 14:25:26,599 INFO org.apache.flink.runtime.jobmaster.JobMaster > [] - Successfully ran initialization on master in 0 ms. > 2021-12-11 14:25:26,617 INFO > org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - > Built 1 pipelined regions in 1 ms > 2021-12-11 14:25:26,626 INFO org.apache.flink.runtime.jobmaster.JobMaster > [] - Using job/cluster config to configure application-defined > state backend: EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null, > enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=1, > writeBatchSize=2097152} > 2021-12-11 14:25:26,627 INFO > org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - > Using predefined options: DEFAULT. > 2021-12-11 14:25:26,627 INFO > org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - > Using application-defined options factory: > DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=1}}. > 2021-12-11 14:25:26,627 INFO org.apache.flink.runtime.jobmaster.JobMaster > [] - Using application-defined state backend: > EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null, > enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=1, > writeBatchSize=2097152} > 2021-12-11 14:25:26,631 INFO org.apache.flink.runtime.jobmaster.JobMaster > [] - Checkpoint storage is set to 'filesystem': (checkpoints > "hdfs://hdfs-namenode:8020/tmp/statefun_checkpoints/myStatefunApp") > 2021-12-11 14:25:26,712 INFO > org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - > Recovering checkpoints from > ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}. > 2021-12-11 14:25:26,724 INFO > org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - > Found 1 checkpoints in > ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}. > 2021-12-11 14:25:26,725 INFO > org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - > Trying to fetch 1 checkpoints from storage. > 2021-12-11 14:25:26,725 INFO > org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - > Trying to retrieve checkpoint 2. > 2021-12-11 14:25:26,931 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring > job 00000000000000000000000000000000 from Checkpoint 2 @ 1639232587220 for > 00000000000000000000000000000000 located at > hdfs://hdfs-namenode:8020/tmp/statefun_checkpoints/myStatefunApp/00000000000000000000000000000000/chk-2. > 2021-12-11 14:25:27,012 ERROR > org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error > occurred in the cluster entrypoint. > org.apache.flink.util.FlinkException: JobMaster for job > 00000000000000000000000000000000 failed. > at > org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:873) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at > org.apache.flink.runtime.dispatcher.Dispatcher.jobManagerRunnerFailed(Dispatcher.java:459) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at > org.apache.flink.runtime.dispatcher.Dispatcher.handleJobManagerRunnerResult(Dispatcher.java:436) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:415) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at java.util.concurrent.CompletableFuture.uniHandle(Unknown Source) ~[?:?] > at java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown > Source) ~[?:?] > at java.util.concurrent.CompletableFuture$Completion.run(Unknown Source) > ~[?:?] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > [flink-dist_2.12-1.13.2.jar:1.13.2] > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > [flink-dist_2.12-1.13.2.jar:1.13.2] > at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) > [flink-dist_2.12-1.13.2.jar:1.13.2] > at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) > [flink-dist_2.12-1.13.2.jar:1.13.2] > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > [flink-dist_2.12-1.13.2.jar:1.13.2] > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > [flink-dist_2.12-1.13.2.jar:1.13.2] > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > [flink-dist_2.12-1.13.2.jar:1.13.2] > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > [flink-dist_2.12-1.13.2.jar:1.13.2] > at akka.actor.Actor.aroundReceive(Actor.scala:517) > [flink-dist_2.12-1.13.2.jar:1.13.2] > at akka.actor.Actor.aroundReceive$(Actor.scala:515) > [flink-dist_2.12-1.13.2.jar:1.13.2] > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > [flink-dist_2.12-1.13.2.jar:1.13.2] > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > [flink-dist_2.12-1.13.2.jar:1.13.2] > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > [flink-dist_2.12-1.13.2.jar:1.13.2] > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > [flink-dist_2.12-1.13.2.jar:1.13.2] > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > [flink-dist_2.12-1.13.2.jar:1.13.2] > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > [flink-dist_2.12-1.13.2.jar:1.13.2] > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > [flink-dist_2.12-1.13.2.jar:1.13.2] > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > [flink-dist_2.12-1.13.2.jar:1.13.2] > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > [flink-dist_2.12-1.13.2.jar:1.13.2] > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > [flink-dist_2.12-1.13.2.jar:1.13.2] > Caused by: org.apache.flink.runtime.client.JobInitializationException: Could > not start the JobMaster. > at > org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) > ~[?:?] > at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown > Source) ~[?:?] > at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) > ~[?:?] > at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) > ~[?:?] > at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) > ~[?:?] > at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown > Source) ~[?:?] > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) > ~[?:?] > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) > ~[?:?] > at java.lang.Thread.run(Unknown Source) ~[?:?] > Caused by: java.util.concurrent.CompletionException: > java.lang.IllegalStateException: There is no operator for the state > 18666b435c78ee2416e74bb997b798a7 > at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown > Source) ~[?:?] > at java.util.concurrent.CompletableFuture.completeThrowable(Unknown > Source) ~[?:?] > at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) > ~[?:?] > at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) > ~[?:?] > at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown > Source) ~[?:?] > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) > ~[?:?] > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) > ~[?:?] > at java.lang.Thread.run(Unknown Source) ~[?:?] > Caused by: java.lang.IllegalStateException: There is no operator for the > state 18666b435c78ee2416e74bb997b798a7 > at > org.apache.flink.runtime.checkpoint.StateAssignmentOperation.checkStateMappingCompleteness(StateAssignmentOperation.java:712) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at > org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:100) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1562) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1476) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at > org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:134) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at > org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at > org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:190) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at > org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:122) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at > org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at > org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at > org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at > org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:317) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at > org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) > ~[?:?] > at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) > ~[?:?] > at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown > Source) ~[?:?] > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) > ~[?:?] > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) > ~[?:?] > at java.lang.Thread.run(Unknown Source) ~[?:?] > 2021-12-11 14:25:27,017 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting > StatefulFunctionsClusterEntryPoint down with application status UNKNOWN. > Diagnostics Cluster entrypoint has been closed externally.. > 2021-12-11 14:25:27,021 INFO > org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting > down rest endpoint. > 2021-12-11 14:25:27,025 INFO org.apache.flink.runtime.blob.BlobServer > [] - Stopped BLOB server at 0.0.0.0:6124 > 2021-12-11 14:25:27,034 INFO > org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Removing > cache directory > /tmp/flink-web-6c2dafc9-bb7d-489a-9e2d-cf78e3f19b67/flink-web-ui > 2021-12-11 14:25:27,035 INFO > org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - > Stopping DefaultLeaderElectionService. > 2021-12-11 14:25:27,035 INFO > org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - > Closing ZooKeeperLeaderElectionDriver{leaderPath='/leader/rest_server_lock'} > 2021-12-11 14:25:27,036 INFO > org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shut down > complete. > 2021-12-11 14:25:27,036 INFO > org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent > [] - Closing components. > 2021-12-11 14:25:27,037 INFO > org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - > Stopping DefaultLeaderRetrievalService. > 2021-12-11 14:25:27,037 INFO > org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] - > Closing > ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/dispatcher_lock'}. > 2021-12-11 14:25:27,037 INFO > org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - > Stopping DefaultLeaderRetrievalService. > 2021-12-11 14:25:27,037 INFO > org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] - > Closing > ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/resource_manager_lock'}. > 2021-12-11 14:25:27,038 INFO > org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - > Stopping DefaultLeaderElectionService. > 2021-12-11 14:25:27,038 INFO > org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - > Closing ZooKeeperLeaderElectionDriver{leaderPath='/leader/dispatcher_lock'} > 2021-12-11 14:25:27,039 INFO > org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcess [] - > Stopping JobDispatcherLeaderProcess. > 2021-12-11 14:25:27,040 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager > [] - Closing the slot manager. > 2021-12-11 14:25:27,040 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager > [] - Suspending the slot manager. > 2021-12-11 14:25:27,041 INFO > org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - > Stopping DefaultLeaderElectionService. > 2021-12-11 14:25:27,041 INFO > org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - > Closing > ZooKeeperLeaderElectionDriver{leaderPath='/leader/resource_manager_lock'} > {code} > But somehow, among several restarts, jobmanager can randomly restore job from > the last checkpoint. After I changed log level of Flink to DEBUG, I've > managed to get the difference between an unsuccessful (resulting in above > log) and a successful sequence of events. It seems that operators can get > assigned different hashes between restarts, here is the relevant log section > for the unsucessful assignment (renamed my operators for clarity): > > {code:java} > 2021-12-11 21:55:14,001 DEBUG > org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated > hash '32d5ca33c915e65563a5c7f4d62703ad' for node 'router (my-ingress-1-in)-5' > {id: 5, parallelism: 1, user function: } > 2021-12-11 21:55:14,001 DEBUG > org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated > hash '33b86fe798648d648b237ddfc986200d' for node 'router (my-ingress-2-in)-4' > {id: 4, parallelism: 1, user function: } > 2021-12-11 21:55:14,001 DEBUG > org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated > hash 'bd4c3fa1570bbcf606f2dabddd61ed7f' for node 'router (my-ingress-3-in)-6' > {id: 6, parallelism: 1, user function: } {code} > .. and here is the same log section for the successful assignment: > {code:java} > 2021-12-11 21:55:34,543 DEBUG > org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated > hash 'a1448ecf31ac98d2215c38bfd119abe0' for node 'router (my-ingress-3-in)-5' > {id: 5, parallelism: 1, user function: } > 2021-12-11 21:55:34,543 DEBUG > org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated > hash '05037ff96baea131d9cf1390846efd98' for node 'router (my-ingress-1-in)-4' > {id: 4, parallelism: 1, user function: } > 2021-12-11 21:55:34,543 DEBUG > org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated > hash '18666b435c78ee2416e74bb997b798a7' for node 'router (my-ingress-2-in)-6' > {id: 6, parallelism: 1, user function: } {code} > As you can see, the hash "18666b435c78ee2416e74bb997b798a7" is generated and > jobmanager could match the operator for the state loaded from the checkpoint > and it could continue normal operation. Another thing to note is, router > operators have different ids assigned between the 2 runs. > I took a look at StreamGraphHasherV2 code ([link|#L65]) there is an explicit > attempt to have the operator order the same between different runs, however > my Stateful Functions application seems to be able to avoid that attempt. > Since we can't assign operator ids when using Stateful Functions, is there > anything I can do right to get it working correctly? Is this a bug, or am I > trying it with a wrong combination of settings or something like that? > As a last note, I've also posted the same earlier to Stack Overflow, here is > the > [link|https://stackoverflow.com/questions/70316498/flink-statefun-high-availability-exception-java-lang-illegalstateexception-th] > to the question. > Thanks -- This message was sent by Atlassian Jira (v8.20.1#820001)