Hi Sen, I took a look at your CLI logs again, and saw that it uses the "default" Flink namespace in ZooKeeper:
2019-02-28 11:18:05,255 INFO org.apache.flink.runtime.util.ZooKeeperUtils - Using '/flink/default' as Zookeeper namespace. However, since you are using YARN, the Flink namespace in ZooKeeper should include the YARN applicationId. Normally, the CLI tries to resolve the applicationId from a local "YARN properties" file [1], which is generated after a successful submission of a session cluster (using Flink's bin/yarn- session.sh) [2]. In your case that file does not exist – maybe because it got deleted, or the host from which you are submitting the job, is a different one from which the session cluster got started. If you submit the job with -yid <yourApplicationId>, or --yarnapplicationId <yourApplicationId>, the CLI should use the correct namespace in ZooKeeper. Just submit the job normally without removing the ZooKeeper configuration from flink-conf.yaml, and without specifying host:port manually with the "-m" option. Let me know if this works for you. Best, Gary [1] https://github.com/apache/flink/blob/e2579e39602ab7d3e906a185353dd413aca58317/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java#L236 [2] https://github.com/apache/flink/blob/e2579e39602ab7d3e906a185353dd413aca58317/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java#L622-L625 [3] https://github.com/apache/flink/blob/e2579e39602ab7d3e906a185353dd413aca58317/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java#L603-L606 On Wed, Mar 6, 2019 at 3:58 AM 孙森 <senny...@163.com> wrote: > Hi Gary: > > Thanks very much! I have tried it as the way you said. It works. > Hopes that the bug can be fixed as soon as possible. > Best! > Sen > > 在 2019年3月5日,下午3:15,Gary Yao <g...@ververica.com> 写道: > > Hi Sen, > > In that email I meant that you should disable the ZooKeeper configuration > in > the CLI because the CLI had troubles resolving the leader from ZooKeeper. > What > you should have done is: > > 1. Start the cluster normally with ZooKeeper enabled > 2. Edit flink-conf.yaml to remove ZooKeeper config > 3. Submit the job to your cluster with -m flag. > > Best, > Gary > > On Tue, Mar 5, 2019 at 8:08 AM 孙森 <senny...@163.com> wrote: > >> Hi Gary: >> >> No zookeeper is because the reason that the job submit will >> fail. >> <屏幕快照 2019-03-05 下午3.07.21.png> >> >> >> Best >> Sen >> >> 在 2019年3月5日,下午3:02,Gary Yao <g...@ververica.com> 写道: >> >> Hi Sen, >> >> I don't see >> >> high-availability: zookeeper >> >> in your Flink configuration. However, this is mandatory for an HA setup. >> By >> default "none" is used, and the ZK configuration is ignored. The log also >> hints that you are using StandaloneLeaderElectionService instead of the >> ZooKeeper implementation (note that the leaderSessionID consists only of >> 0s >> [1][2]): >> >> 2019-03-05 11:23:53,883 INFO >> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - >> http://hdp3:60179 was granted leadership with >> leaderSessionID=00000000-0000-0000-0000-000000000000 >> >> Did you accidentally delete the "high-availability" config from your >> flink- >> conf.yaml? >> >> You probably also want to increase the number of yarn.application-attempts >> [3]. >> >> Best, >> Gary >> >> >> [1] >> https://github.com/apache/flink/blob/dcd8c74b504046802cebf278b718e4753928a260/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java#L48 >> [2] >> https://github.com/apache/flink/blob/dcd8c74b504046802cebf278b718e4753928a260/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java#L52-L57 >> [3] >> https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/deployment/yarn_setup.html#recovery-behavior-of-flink-on-yarn >> >> On Tue, Mar 5, 2019 at 7:41 AM 孙森 <senny...@163.com> wrote: >> >>> Hi Gary: >>> I used FsStateBackend . >>> >>> >>> The jm log is here: >>> >>> >>> After restart , the log is : >>> >>> >>> >>> >>> Best! >>> Sen >>> >>> >>> 在 2019年3月4日,下午10:50,Gary Yao <g...@ververica.com> 写道: >>> >>> Hi Sen, >>> >>> Are you using the default MemoryStateBackend [1]? As far as I know, it >>> does >>> not support JobManager failover. If you are already using FsStateBackend >>> or >>> RocksDBStateBackend, please send JM logs. >>> >>> Best, >>> Gary >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/state/state_backends.html#available-state-backends >>> >>> >>> On Mon, Mar 4, 2019 at 10:01 AM 孙森 <senny...@163.com> wrote: >>> >>>> Hi Gary: >>>> >>>> >>>> Yes, I enable the checkpoints in my program . >>>> >>>> 在 2019年3月4日,上午3:03,Gary Yao <g...@ververica.com> 写道: >>>> >>>> Hi Sen, >>>> >>>> Did you set a restart strategy [1]? If you enabled checkpoints [2], the >>>> fixed- >>>> delay strategy will be used by default. >>>> >>>> Best, >>>> Gary >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/restart_strategies.html >>>> [2] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/state/checkpoints.html >>>> >>>> On Fri, Mar 1, 2019 at 7:27 AM 孙森 <senny...@163.com> wrote: >>>> >>>>> Hi Gary: >>>>> I checked the znode, the address of leader was there. >>>>> >>>>> <屏幕快照 2019-03-01 上午10.45.45 1.png> >>>>> >>>>> When I removed the ZooKeeper configuration in the client's >>>>> flink-conf.yaml, the job was submitted successfully. >>>>> Then I tried to test if the HA could work. I killed the job manager >>>>> ,it restarted .But the job did not restart when the jog manager >>>>> restarted. >>>>> >>>>> >>>>> Best! >>>>> Sen >>>>> >>>>> 在 2019年2月28日,下午6:59,Gary Yao <g...@ververica.com> 写道: >>>>> >>>>> Hi Sen, >>>>> >>>>> I took a look at the CLI code again, and found out that -m is ignored >>>>> if high- >>>>> availability: ZOOKEEPER is configured in your flink-conf.yaml. This >>>>> does not >>>>> seem right and should be at least documented [1]. >>>>> >>>>> Judging from the client logs that you provided, I think the problem is >>>>> that >>>>> the client cannot resolve the leading JobManager from ZooKeeper >>>>> [2][3]. You >>>>> can try the following things for debugging: >>>>> >>>>> * Check the contents in the znode >>>>> /flink/[...]/leader/rest_server_lock using the ZK CLI. It should >>>>> contain the >>>>> address of the leader. If not, I would check the jobmanager logs >>>>> for releated >>>>> errors. >>>>> >>>>> * Submit the job with -m parameter but without ZooKeeper >>>>> configuration in >>>>> the client's flink-conf.yaml >>>>> >>>>> Best, >>>>> Gary >>>>> >>>>> [1] https://issues.apache.org/jira/browse/FLINK-11779 >>>>> [2] >>>>> https://github.com/apache/flink/blob/release-1.5.1/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java#L170 >>>>> [3] >>>>> https://github.com/apache/flink/blob/release-1.5.1/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java#L746-L750 >>>>> >>>>> On Thu, Feb 28, 2019 at 4:34 AM 孙森 <senny...@163.com> wrote: >>>>> >>>>>> Hi,Gary >>>>>> >>>>>> Actually, I have several Flink cluster on Yarn ,each for a >>>>>> project. For one project ,it can only submit job to the specify cluster. >>>>>> I’ve already enabled logging on DEBUG level. >>>>>> >>>>>> How did you determine "jmhost" and "port”? >>>>>> >>>>>> We do this by request the rest api : >>>>>> http://activeRm/proxy/appId/jars <http://activerm/proxy/appId/jars> >>>>>> >>>>>> >>>>>> The all client log is in the mail attachment. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> 在 2019年2月27日,下午9:30,Gary Yao <g...@ververica.com> 写道: >>>>>> >>>>>> Hi, >>>>>> >>>>>> How did you determine "jmhost" and "port"? Actually you do not need >>>>>> to specify >>>>>> these manually. If the client is using the same configuration as your >>>>>> cluster, >>>>>> the client will look up the leading JM from ZooKeeper. >>>>>> >>>>>> If you have already tried omitting the "-m" parameter, you can check >>>>>> in the >>>>>> client logs which host is used for the job submission [1]. Note that >>>>>> you need >>>>>> to enable logging on DEBUG level. >>>>>> >>>>>> The root cause in your stacktrace is a TimeoutException. I would >>>>>> debug this by >>>>>> checking if you can establish a TCP connection – from the machine you >>>>>> are >>>>>> submitting the job from, to the target host/port [2]. >>>>>> >>>>>> Moreover, you are using a quite dated Flink version. The newest >>>>>> version in the >>>>>> 1.5 major release is 1.5.6 – so consider upgrading to that or even to >>>>>> 1.7. >>>>>> >>>>>> Best, >>>>>> Gary >>>>>> >>>>>> [1] >>>>>> https://github.com/apache/flink/blob/3488f8b144a2127497c39b8ed5a48a65b551c57d/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java#L185 >>>>>> [2] >>>>>> https://stackoverflow.com/questions/4922943/test-from-shell-script-if-remote-tcp-port-is-open >>>>>> >>>>>> On Wed, Feb 27, 2019 at 8:09 AM 孙森 <senny...@163.com> wrote: >>>>>> >>>>>>> Hi all: >>>>>>> >>>>>>> I run flink (1.5.1 with hadoop 2.7) on yarn ,and submit job >>>>>>> by “/usr/local/flink/bin/flink run -m jmhost:port my.jar”, but the >>>>>>> submission is failed. >>>>>>> The HA configuration is : >>>>>>> >>>>>>> - high-availability: zookeeper >>>>>>> - high-availability.storageDir: hdfs:///flink/ha/ >>>>>>> - high-availability.zookeeper.quorum: >>>>>>> hdp1:2181,hdp2:2181,hdp3:2181 >>>>>>> - yarn.application-attempts: 2 >>>>>>> >>>>>>> The info showed int the client log: >>>>>>> >>>>>>> >>>>>>> 2019-02-27 11:48:38,651 INFO org.apache.flink.runtime.rest.RestClient >>>>>>> - Shutting down rest endpoint. >>>>>>> 2019-02-27 11:48:38,659 INFO org.apache.flink.runtime.rest.RestClient >>>>>>> - Rest endpoint shutdown complete. >>>>>>> 2019-02-27 11:48:38,662 INFO >>>>>>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService >>>>>>> - Stopping ZooKeeperLeaderRetrievalService /leader/rest_server_lock. >>>>>>> 2019-02-27 11:48:38,665 INFO >>>>>>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService >>>>>>> - Stopping ZooKeeperLeaderRetrievalService /leader/dispatcher_lock. >>>>>>> 2019-02-27 11:48:38,670 INFO >>>>>>> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl >>>>>>> - backgroundOperationsLoop exiting >>>>>>> 2019-02-27 11:48:38,689 INFO >>>>>>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - >>>>>>> Session: 0x2679c52880c00ee closed >>>>>>> 2019-02-27 11:48:38,689 INFO >>>>>>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - >>>>>>> EventThread shut down for session: 0x2679c52880c00ee >>>>>>> 2019-02-27 11:48:38,690 ERROR org.apache.flink.client.cli.CliFrontend >>>>>>> - Error while running the command. >>>>>>> org.apache.flink.client.program.ProgramInvocationException: Could not >>>>>>> retrieve the execution result. >>>>>>> at >>>>>>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:257) >>>>>>> at >>>>>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464) >>>>>>> at >>>>>>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) >>>>>>> at >>>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) >>>>>>> at scala.Function0$class.apply$mcV$sp(Function0.scala:34) >>>>>>> at >>>>>>> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) >>>>>>> at scala.App$$anonfun$main$1.apply(App.scala:76) >>>>>>> at scala.App$$anonfun$main$1.apply(App.scala:76) >>>>>>> at scala.collection.immutable.List.foreach(List.scala:381) >>>>>>> at >>>>>>> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) >>>>>>> at scala.App$class.main(App.scala:76) >>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>>> at >>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>>> at >>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>>>> at >>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) >>>>>>> at >>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420) >>>>>>> at >>>>>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404) >>>>>>> at >>>>>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:785) >>>>>>> at >>>>>>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:279) >>>>>>> at >>>>>>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214) >>>>>>> at >>>>>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025) >>>>>>> at >>>>>>> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1101) >>>>>>> at java.security.AccessController.doPrivileged(Native Method) >>>>>>> at javax.security.auth.Subject.doAs(Subject.java:422) >>>>>>> at >>>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) >>>>>>> at >>>>>>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) >>>>>>> at >>>>>>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1101) >>>>>>> Caused by: org.apache.flink.runtime.client.JobSubmissionException: >>>>>>> Failed to submit JobGraph. >>>>>>> at >>>>>>> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:370) >>>>>>> at >>>>>>> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) >>>>>>> at >>>>>>> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) >>>>>>> at >>>>>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) >>>>>>> at >>>>>>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) >>>>>>> at >>>>>>> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:214) >>>>>>> at >>>>>>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) >>>>>>> at >>>>>>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) >>>>>>> at >>>>>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) >>>>>>> at >>>>>>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) >>>>>>> at >>>>>>> org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:834) >>>>>>> at >>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>>>>> at >>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >>>>>>> at >>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >>>>>>> at >>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>>>>> at >>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>> Caused by: java.util.concurrent.CompletionException: >>>>>>> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could >>>>>>> not complete the operation. Exception is not retryable. >>>>>>> at >>>>>>> java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326) >>>>>>> at >>>>>>> java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338) >>>>>>> at >>>>>>> java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911) >>>>>>> at >>>>>>> java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899) >>>>>>> ... 15 more >>>>>>> Caused by: >>>>>>> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could >>>>>>> not complete the operation. Exception is not retryable. >>>>>>> ... 13 more >>>>>>> Caused by: java.util.concurrent.CompletionException: >>>>>>> java.util.concurrent.TimeoutException >>>>>>> at >>>>>>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) >>>>>>> at >>>>>>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) >>>>>>> at >>>>>>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) >>>>>>> at >>>>>>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) >>>>>>> ... 10 more >>>>>>> Caused by: java.util.concurrent.TimeoutException >>>>>>> ... 8 more >>>>>>> >>>>>>> ------------------------------------------------------------ >>>>>>> The program finished with the following exception: >>>>>>> >>>>>>> org.apache.flink.client.program.ProgramInvocationException: Could not >>>>>>> retrieve the execution result. >>>>>>> at >>>>>>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:257) >>>>>>> at >>>>>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464) >>>>>>> at >>>>>>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) >>>>>>> at >>>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) >>>>>>> at >>>>>>> edp.wormhole.flinkx.eventflow.WormholeFlinkMainProcess.process(WormholeFlinkMainProcess.scala:114) >>>>>>> at >>>>>>> edp.wormhole.flinkx.eventflow.WormholeFlinkxStarter$.delayedEndpoint$edp$wormhole$flinkx$eventflow$WormholeFlinkxStarter$1(WormholeFlinkxStarter.scala:40) >>>>>>> at >>>>>>> edp.wormhole.flinkx.eventflow.WormholeFlinkxStarter$delayedInit$body.apply(WormholeFlinkxStarter.scala:29) >>>>>>> at scala.Function0$class.apply$mcV$sp(Function0.scala:34) >>>>>>> at >>>>>>> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) >>>>>>> at scala.App$$anonfun$main$1.apply(App.scala:76) >>>>>>> at scala.App$$anonfun$main$1.apply(App.scala:76) >>>>>>> at scala.collection.immutable.List.foreach(List.scala:381) >>>>>>> at >>>>>>> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) >>>>>>> at scala.App$class.main(App.scala:76) >>>>>>> at >>>>>>> edp.wormhole.flinkx.eventflow.WormholeFlinkxStarter$.main(WormholeFlinkxStarter.scala:29) >>>>>>> at >>>>>>> edp.wormhole.flinkx.eventflow.WormholeFlinkxStarter.main(WormholeFlinkxStarter.scala) >>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>>> at >>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>>> at >>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>>>> at >>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) >>>>>>> at >>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420) >>>>>>> at >>>>>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404) >>>>>>> at >>>>>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:785) >>>>>>> at >>>>>>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:279) >>>>>>> at >>>>>>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214) >>>>>>> at >>>>>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025) >>>>>>> at >>>>>>> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1101) >>>>>>> at java.security.AccessController.doPrivileged(Native Method) >>>>>>> at javax.security.auth.Subject.doAs(Subject.java:422) >>>>>>> at >>>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) >>>>>>> at >>>>>>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) >>>>>>> at >>>>>>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1101) >>>>>>> Caused by: org.apache.flink.runtime.client.JobSubmissionException: >>>>>>> Failed to submit JobGraph. >>>>>>> at >>>>>>> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:370) >>>>>>> at >>>>>>> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) >>>>>>> at >>>>>>> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) >>>>>>> at >>>>>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) >>>>>>> at >>>>>>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) >>>>>>> at >>>>>>> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:214) >>>>>>> at >>>>>>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) >>>>>>> at >>>>>>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) >>>>>>> at >>>>>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) >>>>>>> at >>>>>>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) >>>>>>> at >>>>>>> org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:834) >>>>>>> at >>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>>>>> at >>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >>>>>>> at >>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >>>>>>> at >>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>>>>> at >>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>> Caused by: java.util.concurrent.CompletionException: >>>>>>> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could >>>>>>> not complete the operation. Exception is not retryable. >>>>>>> at >>>>>>> java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326) >>>>>>> at >>>>>>> java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338) >>>>>>> at >>>>>>> java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911) >>>>>>> at >>>>>>> java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899) >>>>>>> ... 15 more >>>>>>> Caused by: >>>>>>> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could >>>>>>> not complete the operation. Exception is not retryable. >>>>>>> ... 13 more >>>>>>> Caused by: java.util.concurrent.CompletionException: >>>>>>> java.util.concurrent.TimeoutException >>>>>>> at >>>>>>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) >>>>>>> at >>>>>>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) >>>>>>> at >>>>>>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) >>>>>>> at >>>>>>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) >>>>>>> ... 10 more >>>>>>> Caused by: java.util.concurrent.TimeoutException >>>>>>> ... 8 more >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >