Hi Niels, This is with 1.1.1? We could fix this in the upcoming 1.1.2 release by only using automatic shut down for detached jobs. In all other cases we should be able to shutdown from the client side after running all jobs. The only downside I see is that Flink clusters may actually never be shutdown if the CLI somehow crashes or gets a network partition.
Best, Max On Thu, Aug 25, 2016 at 12:04 PM, Niels Basjes <ni...@basjes.nl> wrote: > Hi, > > I created a small application that needs to run multiple (batch) jobs on > Yarn and then terminate. > In this case I'm exporting data from a list of HBase tables > > I essentially do right now the following: > > flink run -m yarn-cluster -yn 10 bla.jar ... > > And in my main I do > > foreach thing I need to do { > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > env. ... define the batch job. > env.execute > } > > In the second job I submit I get an exception: > java.lang.RuntimeException: Unable to tell application master to stop once > the specified job has been finised > at > org.apache.flink.yarn.YarnClusterClient.stopAfterJob(YarnClusterClient.java:184) > at > org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:202) > at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389) > at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:376) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:61) > at com.bol.tools.hbase.export.Main.runSingleTable(Main.java:220) > at com.bol.tools.hbase.export.Main.run(Main.java:81) > at com.bol.tools.hbase.export.Main.main(Main.java:42) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331) > at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:775) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:251) > at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:995) > at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:992) > at > org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:56) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) > at > org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:53) > at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:992) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1046) > Caused by: java.util.concurrent.TimeoutException: Futures timed out after > [10000 milliseconds] > at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) > at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) > at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) > at > scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) > at scala.concurrent.Await$.result(package.scala:107) > at scala.concurrent.Await.result(package.scala) > at > org.apache.flink.yarn.YarnClusterClient.stopAfterJob(YarnClusterClient.java:182) > ... 25 more > > > How do I (without using yarn-session) tell the YarnClusterClient to simply > 'keep running because there will be more jobs'? > > If I run this same code in a yarn-session it works but then I have the > troubles of starting a (detached yarn-session) AND to terminate that thing > again after my jobs have run. > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes