Ooooh more fun... If I rescale down a job, the job's config at
jobs/{jobid}/config does not reflect the new parallelism (there may not
even be any way to detect such a parallelism change)... but more critically
the job is now unstoppable and seems to end up stuck in the CANCELLING
state for some time (I gave up waiting)

On Fri, 7 Feb 2020 at 11:54, Stephen Connolly <
stephen.alan.conno...@gmail.com> wrote:

> And now the job is stuck in a suspended state and I seem to have no way to
> get it out of that state again!
>
> On Fri, 7 Feb 2020 at 11:50, Stephen Connolly <
> stephen.alan.conno...@gmail.com> wrote:
>
>> The plot thickens... I was able to rescale down... just not back up
>> again!!!
>>
>> root@flink-jobmanager-64bcfdf799-jhs7p:/opt/flink# bin/flink list -m
>> localhost:8081
>> Waiting for response...
>> ------------------ Running/Restarting Jobs -------------------
>> 07.02.2020 11:26:33 : ebc20a700c334f61ea03ecdf3d8939ca : Test topology
>> (RUNNING)
>> --------------------------------------------------------------
>> No scheduled jobs.
>> root@flink-jobmanager-64bcfdf799-jhs7p:/opt/flink# bin/flink modify -m
>> localhost:8081 ebc20a700c334f61ea03ecdf3d8939ca -p 1
>> Modify job ebc20a700c334f61ea03ecdf3d8939ca.
>> Rescaled job ebc20a700c334f61ea03ecdf3d8939ca. Its new parallelism is 1.
>> root@flink-jobmanager-64bcfdf799-jhs7p:/opt/flink# bin/flink modify -m
>> localhost:8081 ebc20a700c334f61ea03ecdf3d8939ca -p 2
>> Modify job ebc20a700c334f61ea03ecdf3d8939ca.
>>
>> ------------------------------------------------------------
>>  The program finished with the following exception:
>>
>> org.apache.flink.util.FlinkException: Could not rescale job
>> ebc20a700c334f61ea03ecdf3d8939ca.
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$modify$10(CliFrontend.java:799)
>> at
>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:985)
>> at org.apache.flink.client.cli.CliFrontend.modify(CliFrontend.java:790)
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1068)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>> at
>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
>> Caused by: java.util.concurrent.CompletionException:
>> java.lang.IllegalStateException: Suspend needs to happen atomically
>> at
>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
>> at
>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
>> at
>> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:975)
>> at
>> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
>> at
>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185)
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>> at
>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>> at akka.actor.Actor.aroundReceive(Actor.scala:502)
>> at akka.actor.Actor.aroundReceive$(Actor.scala:500)
>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>> at
>> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
>> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
>> at
>> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
>> Caused by: java.lang.IllegalStateException: Suspend needs to happen
>> atomically
>> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.suspend(ExecutionGraph.java:1172)
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.suspendExecutionGraph(JobMaster.java:1221)
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$5(JobMaster.java:465)
>> at
>> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
>> ... 20 more
>>
>> On Fri, 7 Feb 2020 at 11:40, Stephen Connolly <
>> stephen.alan.conno...@gmail.com> wrote:
>>
>>> So I am looking at the Flink Management REST API... and, as I see it,
>>> there are two paths to rescale a running topology:
>>>
>>> 1. Stop the topology with a savepoint and then start it up with the new
>>> savepoint; or
>>> 2. Use the /jobs/:jobid/rescaling
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/rest_api.html#jobs-jobid-rescaling>
>>> endpoint
>>>
>>> The first one seems to work just fine.
>>>
>>> The second one seems to just blow up every time I try to use it... I'll
>>> get things like:
>>>
>>>
>>> https://gist.github.com/stephenc/0bbc08391ddce5a781242900e4b33a5d#file-log-txt
>>>
>>> The above was for the topology
>>> https://gist.github.com/stephenc/0bbc08391ddce5a781242900e4b33a5d#file-streamingjob-java
>>> running with options:
>>>
>>>     --source parallel
>>>
>>> Things are even worse with --source iterator as that has no checkpoint
>>> state to recover from
>>>
>>> Right now I am trying to discover what preconditions are required to be
>>> met in order to be able to safely call the Rescaling endpoint and actually
>>> have it work... I should note that I currently have not managed to get it
>>> to work at all!!!
>>>
>>> One of the things we are trying to do is add some automation to enable
>>> scale-up / down as we see surges in processing load. We want to have an
>>> automated system that can respond to those situations automatically for low
>>> deltas and trigger an on-call engineer for persistent excess load. In that
>>> regard I'd like to know what the automation should check to know whether it
>>> can do rescaling via the dedicated end-point or if it should use the
>>> reliable (but presumably slower) path of stop with savepoint & start from
>>> savepoint.
>>>
>>> The
>>> https://gist.github.com/stephenc/0bbc08391ddce5a781242900e4b33a5d#file-streamingjob-java
>>> job I have been using is just a quick job to let me test the automation on
>>> a local cluster. It is designed to output a strictly increasing sequence of
>>> numbers without missing any... optionally double them and then print them
>>> out. The different sources are me experimenting with different types of
>>> operator to see what kinds of topology can work with the rescaling end-point
>>>
>>> Thanks in advance
>>>
>>

Reply via email to