[ https://issues.apache.org/jira/browse/FLINK-8902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Gary Yao updated FLINK-8902: ---------------------------- Labels: flip6 (was: ) > Re-scaling job sporadically fails with KeeperException > ------------------------------------------------------ > > Key: FLINK-8902 > URL: https://issues.apache.org/jira/browse/FLINK-8902 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination > Affects Versions: 1.5.0, 1.6.0 > Environment: Commit: 80020cb > Hadoop: 2.8.3 > YARN > > Reporter: Gary Yao > Priority: Blocker > Labels: flip6 > Fix For: 1.5.0 > > > *Description* > Re-scaling a job with {{bin/flink modify -p <new_parallelism>}} sporadically > fails with a {{KeeperException}} > *Steps to reproduce* > # Submit job to Flink cluster running on YARN (session mode). > # Re-scale job (5-20 times) > *Stacktrace (client)* > {noformat} > org.apache.flink.util.FlinkException: Could not rescale job > 61e2e99db2e959ebd94e40f9c5e816bc. > at > org.apache.flink.client.cli.CliFrontend.lambda$modify$8(CliFrontend.java:766) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:954) > at org.apache.flink.client.cli.CliFrontend.modify(CliFrontend.java:757) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1037) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1095) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1095) > Caused by: java.util.concurrent.CompletionException: > org.apache.flink.runtime.jobmaster.exceptions.JobModificationException: Could > not restore from temporary rescaling savepoint. This might indicate that the > savepoint > hdfs://172.31.33.72:9000/flinkha/savepoints/savepoint-61e2e9-fdb3d05a0035 got > corrupted. Deleting this savepoint as a precaution. > at > org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$3(JobMaster.java:525) > at > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) > at > java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:295) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:150) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:66) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132) > at > akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: > org.apache.flink.runtime.jobmaster.exceptions.JobModificationException: Could > not restore from temporary rescaling savepoint. This might indicate that the > savepoint > hdfs://172.31.33.72:9000/flinkha/savepoints/savepoint-61e2e9-fdb3d05a0035 got > corrupted. Deleting this savepoint as a precaution. > at > org.apache.flink.runtime.jobmaster.JobMaster.lambda$restoreExecutionGraphFromRescalingSavepoint$17(JobMaster.java:1317) > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.util.ConcurrentModificationException: ZooKeeper unexpectedly > modified > at > org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.addAndLock(ZooKeeperStateHandleStore.java:168) > at > org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.addCheckpoint(ZooKeeperCompletedCheckpointStore.java:233) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1088) > at > org.apache.flink.runtime.jobmaster.JobMaster.tryRestoreExecutionGraphFromSavepoint(JobMaster.java:1161) > at > org.apache.flink.runtime.jobmaster.JobMaster.lambda$restoreExecutionGraphFromRescalingSavepoint$17(JobMaster.java:1297) > ... 10 more > Caused by: > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.KeeperException$NodeExistsException: > KeeperErrorCode = NodeExists > at > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.KeeperException.create(KeeperException.java:119) > at > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper.multiInternal(ZooKeeper.java:1006) > at > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper.multi(ZooKeeper.java:910) > at > org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorTransactionImpl.doOperation(CuratorTransactionImpl.java:159) > at > org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorTransactionImpl.access$200(CuratorTransactionImpl.java:44) > at > org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorTransactionImpl$2.call(CuratorTransactionImpl.java:129) > at > org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorTransactionImpl$2.call(CuratorTransactionImpl.java:125) > at > org.apache.flink.shaded.curator.org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:109) > at > org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorTransactionImpl.commit(CuratorTransactionImpl.java:122) > at > org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.addAndLock(ZooKeeperStateHandleStore.java:162) > ... 14 more > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)