Thank you! On Mon, Oct 28, 2019 at 3:53 AM vino yang <yanghua1...@gmail.com> wrote:
> Hi Pankaj, > > It seems it is a bug. You can report it by opening a Jira issue. > > Best, > Vino > > Pankaj Chand <pankajchanda...@gmail.com> 于2019年10月28日周一 上午10:51写道: > >> Hello, >> >> I am trying to modify the parallelism of a streaming Flink job >> (wiki-edits example) multiple times on a standalone cluster (one local >> machine) having two TaskManagers with 3 slots each (i.e. 6 slots total). >> However, the "modify" command is only working once (e.g. when I change the >> parallelism from 2 to 4). The second time (e.g. change parallelism to 6 or >> even back to 2), it is giving an error. >> >> I am using Flink 1.8.1 (since I found that the modify parallelism command >> has been removed from v1.9 documentation) and have configured savepoints to >> be written to file:///home/pankaj/flink-checkpoints. The output of the >> first "modify <jobid> -p 4" command and second "modify <jobid> -p 6" >> command is copied below. >> >> Please tell me how to modify parallelism multiple times at runtime. >> >> Thanks, >> >> Pankaj >> >> >> $ ./bin/flink modify 94831ca34951975dbee3335a384ee935 -p 4 >> Modify job 94831ca34951975dbee3335a384ee935. >> Rescaled job 94831ca34951975dbee3335a384ee935. Its new parallelism is 4. >> >> $ ./bin/flink modify 94831ca34951975dbee3335a384ee935 -p 6 >> Modify job 94831ca34951975dbee3335a384ee935. >> >> ------------------------------------------------------------ >> The program finished with the following exception: >> >> org.apache.flink.util.FlinkException: Could not rescale job >> 94831ca34951975dbee3335a384ee935. >> at >> org.apache.flink.client.cli.CliFrontend.lambda$modify$10(CliFrontend.java:799) >> at >> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:985) >> at org.apache.flink.client.cli.CliFrontend.modify(CliFrontend.java:790) >> at >> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1068) >> at >> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) >> at >> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) >> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) >> Caused by: java.util.concurrent.CompletionException: >> java.lang.IllegalStateException: Suspend needs to happen atomically >> at >> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) >> at >> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) >> at >> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:961) >> at >> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) >> at >> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) >> at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392) >> at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185) >> at >> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) >> at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147) >> at >> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) >> at >> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) >> at akka.actor.Actor.aroundReceive(Actor.scala:502) >> at akka.actor.Actor.aroundReceive$(Actor.scala:500) >> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) >> at akka.actor.ActorCell.invoke(ActorCell.scala:495) >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) >> at akka.dispatch.Mailbox.run(Mailbox.scala:224) >> at akka.dispatch.Mailbox.exec(Mailbox.scala:234) >> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) >> at >> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) >> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) >> at >> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) >> Caused by: java.lang.IllegalStateException: Suspend needs to happen >> atomically >> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) >> at >> org.apache.flink.runtime.executiongraph.ExecutionGraph.suspend(ExecutionGraph.java:1172) >> at >> org.apache.flink.runtime.jobmaster.JobMaster.suspendExecutionGraph(JobMaster.java:1221) >> at >> org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$5(JobMaster.java:465) >> at >> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) >> ... 20 more >> >