Gary Yao created FLINK-8902: ------------------------------- Summary: Re-scaling job sporadically fails with KeeperException Key: FLINK-8902 URL: https://issues.apache.org/jira/browse/FLINK-8902 Project: Flink Issue Type: Bug Components: Distributed Coordination Affects Versions: 1.5.0, 1.6.0 Environment: Commit: 80020cb
Hadoop: 2.8.3 YARN Reporter: Gary Yao Fix For: 1.5.0 *Description* Re-scaling a job with {{bin/flink modify -p <new_parallelism>}} sporadically fails with a {{KeeperException}} *Steps to reproduce* # Submit job to Flink cluster running on YARN (session mode). # Re-scale job (5-20 times) *Stacktrace (client)* {noformat} org.apache.flink.util.FlinkException: Could not rescale job 61e2e99db2e959ebd94e40f9c5e816bc. at org.apache.flink.client.cli.CliFrontend.lambda$modify$8(CliFrontend.java:766) at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:954) at org.apache.flink.client.cli.CliFrontend.modify(CliFrontend.java:757) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1037) at org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1095) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1095) Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmaster.exceptions.JobModificationException: Could not restore from temporary rescaling savepoint. This might indicate that the savepoint hdfs://172.31.33.72:9000/flinkha/savepoints/savepoint-61e2e9-fdb3d05a0035 got corrupted. Deleting this savepoint as a precaution. at org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$3(JobMaster.java:525) at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:295) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:150) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:66) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132) at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.flink.runtime.jobmaster.exceptions.JobModificationException: Could not restore from temporary rescaling savepoint. This might indicate that the savepoint hdfs://172.31.33.72:9000/flinkha/savepoints/savepoint-61e2e9-fdb3d05a0035 got corrupted. Deleting this savepoint as a precaution. at org.apache.flink.runtime.jobmaster.JobMaster.lambda$restoreExecutionGraphFromRescalingSavepoint$17(JobMaster.java:1317) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.ConcurrentModificationException: ZooKeeper unexpectedly modified at org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.addAndLock(ZooKeeperStateHandleStore.java:168) at org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.addCheckpoint(ZooKeeperCompletedCheckpointStore.java:233) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1088) at org.apache.flink.runtime.jobmaster.JobMaster.tryRestoreExecutionGraphFromSavepoint(JobMaster.java:1161) at org.apache.flink.runtime.jobmaster.JobMaster.lambda$restoreExecutionGraphFromRescalingSavepoint$17(JobMaster.java:1297) ... 10 more Caused by: org.apache.flink.shaded.zookeeper.org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.KeeperException.create(KeeperException.java:119) at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper.multiInternal(ZooKeeper.java:1006) at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper.multi(ZooKeeper.java:910) at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorTransactionImpl.doOperation(CuratorTransactionImpl.java:159) at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorTransactionImpl.access$200(CuratorTransactionImpl.java:44) at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorTransactionImpl$2.call(CuratorTransactionImpl.java:129) at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorTransactionImpl$2.call(CuratorTransactionImpl.java:125) at org.apache.flink.shaded.curator.org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:109) at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorTransactionImpl.commit(CuratorTransactionImpl.java:122) at org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.addAndLock(ZooKeeperStateHandleStore.java:162) ... 14 more {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)