Hi Devin,

If I understand you correctly, you are submitting a job in the YARN per-job
cluster mode. You are then invoking the "cancel with savepoint" command but
the client is not able to poll for the savepoint location before the cluster
shuts down.

I think your analysis is correct. As far as I can see, we do not wait for
the
poll to happen before we shut down the cluster. In the session mode this is
not a problem because the cluster will continue to run. Can you open a JIRA
issue?

Best,
Gary


On Fri, Sep 7, 2018 at 5:46 PM, Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Vino and Devin,
>
> could you maybe send us the cluster entrypoint and client logs once you
> observe the exception? That way it will be possible to debug it.
>
> Cheers,
> Till
>
> On Tue, Sep 4, 2018 at 2:26 PM vino yang <yanghua1...@gmail.com> wrote:
>
> > Hi Devin,
> >
> > Why do you trigger cancel with savepoint immediately after the job status
> > changes to Deployed? A more secure way is to wait for the job to become
> > running after it has been running for a while before triggering.
> >
> > We have also encountered before, there will be a case where the client
> > times out or still tries to connect to the closed JM after RestClient
> calls
> > cancel with savepoint.
> >
> > Thanks, vino.
> >
> > devinduan(段丁瑞) <devind...@tencent.com> 于2018年9月4日周二 下午6:22写道:
> >
> > > Hi all,
> > >       I submit a flink job through yarn-cluster mode and cancel job
> with
> > > savepoint option immediately after job status change to deployed.
> > Sometimes
> > > i met this error:
> > >
> > > org.apache.flink.util.FlinkException: Could not cancel job xxxx.
> > >         at
> > >
> > org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(
> CliFrontend.java:585)
> > >         at
> > >
> > org.apache.flink.client.cli.CliFrontend.runClusterAction(
> CliFrontend.java:960)
> > >         at
> > > org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:577)
> > >         at
> > >
> > org.apache.flink.client.cli.CliFrontend.parseParameters(
> CliFrontend.java:1034)
> > >         at java.lang.Thread.run(Thread.java:748)
> > > Caused by: java.util.concurrent.ExecutionException:
> > > org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could
> not
> > > complete the operation. Number of retries has been exhausted.
> > >         at
> > >
> > java.util.concurrent.CompletableFuture.reportGet(
> CompletableFuture.java:357)
> > >         at
> > > java.util.concurrent.CompletableFuture.get(
> CompletableFuture.java:1895)
> > >         at
> > >
> > org.apache.flink.client.program.rest.RestClusterClient.
> cancelWithSavepoint(RestClusterClient.java:398)
> > >         at
> > >
> > org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(
> CliFrontend.java:583)
> > >         ... 6 more
> > > Caused by:
> > org.apache.flink.runtime.concurrent.FutureUtils$RetryException:
> > > Could not complete the operation. Number of retries has been exhausted.
> > >         at
> > >
> > org.apache.flink.runtime.concurrent.FutureUtils.lambda$
> retryOperationWithDelay$5(FutureUtils.java:213)
> > >         at
> > >
> > java.util.concurrent.CompletableFuture.uniWhenComplete(
> CompletableFuture.java:760)
> > >         at
> > >
> > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(
> CompletableFuture.java:736)
> > >         at
> > >
> > java.util.concurrent.CompletableFuture.postComplete(
> CompletableFuture.java:474)
> > >         at
> > >
> > java.util.concurrent.CompletableFuture.completeExceptionally(
> CompletableFuture.java:1977)
> > >         at
> > >
> > org.apache.flink.runtime.rest.RestClient.lambda$
> submitRequest$1(RestClient.java:274)
> > >         at
> > >
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.
> notifyListener0(DefaultPromise.java:680)
> > >         at
> > >
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.
> notifyListeners0(DefaultPromise.java:603)
> > >         at
> > >
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.
> notifyListeners(DefaultPromise.java:563)
> > >         at
> > >
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.
> tryFailure(DefaultPromise.java:424)
> > >         at
> > >
> > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$
> AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:268)
> > >         at
> > >
> > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$
> AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:284)
> > >         at
> > >
> > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.
> processSelectedKey(NioEventLoop.java:528)
> > >         at
> > >
> > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.
> processSelectedKeysOptimized(NioEventLoop.java:468)
> > >         at
> > >
> > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.
> processSelectedKeys(NioEventLoop.java:382)
> > >         at
> > >
> > org.apache.flink.shaded.netty4.io.netty.channel.nio.
> NioEventLoop.run(NioEventLoop.java:354)
> > >         at
> > >
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.
> SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> > >         at
> > >
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.
> DefaultThreadFactory$DefaultRunnableDecorator.run(
> DefaultThreadFactory.java:137)
> > >         ... 1 more
> > > Caused by: java.util.concurrent.CompletionException:
> > > java.net.ConnectException: Connect refuse: xxx/xxx.xxx.xxx.xxx:xxx
> > >         at
> > >
> > java.util.concurrent.CompletableFuture.encodeThrowable(
> CompletableFuture.java:292)
> > >         at
> > >
> > java.util.concurrent.CompletableFuture.completeThrowable(
> CompletableFuture.java:308)
> > >         at
> > >
> > java.util.concurrent.CompletableFuture.uniCompose(
> CompletableFuture.java:943)
> > >         at
> > >
> > java.util.concurrent.CompletableFuture$UniCompose.
> tryFire(CompletableFuture.java:926)
> > >         ... 16 more
> > > Caused by: java.net.ConnectException: Connect refuse:
> > > xxx/xxx.xxx.xxx.xxx:xxx
> > >         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> > >         at
> > > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> > >         at
> > >
> > org.apache.flink.shaded.netty4.io.netty.channel.
> socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
> > >         at
> > >
> > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$
> AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:281)
> > >         ... 7 more
> > >
> > >     I check the jobmanager log, no error found. Savepoint is correct
> > saved
> > > in hdfs. Yarn appliction status changed to FINISHED and FinalStatus
> > change
> > > to KILLED.
> > >     I think this issue occur because RestClusterClient cannot find
> > > jobmanager addresss after Jobmanager(AM) has shutdown.
> > >     My flink version is 1.5.3.
> > >     Anyone could help me to resolve this issue, thanks!
> > >
> > > devin.
> > >
> > >
> > >
> > > ________________________________
> > >
> > >
> >
>

Reply via email to