Thanks Niels, actually I also created one :) We will fix this on the master and for the 1.1.2 release.
On Thu, Aug 25, 2016 at 5:14 PM, Niels Basjes <ni...@basjes.nl> wrote: > I have this with a pretty recent version of the source version (not a > release). > > Would be great if you see a way to fix this. > I consider it fine if this requires an extra call to the system indicating > that this is a 'mulitple job' situation. > > I created https://issues.apache.org/jira/browse/FLINK-4495 for you > > Niels Basjes > > On Thu, Aug 25, 2016 at 3:34 PM, Maximilian Michels <m...@apache.org> wrote: >> >> Hi Niels, >> >> This is with 1.1.1? We could fix this in the upcoming 1.1.2 release by >> only using automatic shut down for detached jobs. In all other cases >> we should be able to shutdown from the client side after running all >> jobs. The only downside I see is that Flink clusters may actually >> never be shutdown if the CLI somehow crashes or gets a network >> partition. >> >> Best, >> Max >> >> On Thu, Aug 25, 2016 at 12:04 PM, Niels Basjes <ni...@basjes.nl> wrote: >> > Hi, >> > >> > I created a small application that needs to run multiple (batch) jobs on >> > Yarn and then terminate. >> > In this case I'm exporting data from a list of HBase tables >> > >> > I essentially do right now the following: >> > >> > flink run -m yarn-cluster -yn 10 bla.jar ... >> > >> > And in my main I do >> > >> > foreach thing I need to do { >> > ExecutionEnvironment env = >> > ExecutionEnvironment.getExecutionEnvironment(); >> > env. ... define the batch job. >> > env.execute >> > } >> > >> > In the second job I submit I get an exception: >> > java.lang.RuntimeException: Unable to tell application master to stop >> > once >> > the specified job has been finised >> > at >> > >> > org.apache.flink.yarn.YarnClusterClient.stopAfterJob(YarnClusterClient.java:184) >> > at >> > >> > org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:202) >> > at >> > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389) >> > at >> > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:376) >> > at >> > >> > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:61) >> > at com.bol.tools.hbase.export.Main.runSingleTable(Main.java:220) >> > at com.bol.tools.hbase.export.Main.run(Main.java:81) >> > at com.bol.tools.hbase.export.Main.main(Main.java:42) >> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> > at >> > >> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> > at >> > >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> > at java.lang.reflect.Method.invoke(Method.java:497) >> > at >> > >> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509) >> > at >> > >> > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >> > at >> > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331) >> > at >> > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:775) >> > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:251) >> > at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:995) >> > at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:992) >> > at >> > >> > org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:56) >> > at java.security.AccessController.doPrivileged(Native Method) >> > at javax.security.auth.Subject.doAs(Subject.java:422) >> > at >> > >> > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) >> > at >> > >> > org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:53) >> > at >> > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:992) >> > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1046) >> > Caused by: java.util.concurrent.TimeoutException: Futures timed out >> > after >> > [10000 milliseconds] >> > at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) >> > at >> > scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) >> > at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) >> > at >> > >> > scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) >> > at scala.concurrent.Await$.result(package.scala:107) >> > at scala.concurrent.Await.result(package.scala) >> > at >> > >> > org.apache.flink.yarn.YarnClusterClient.stopAfterJob(YarnClusterClient.java:182) >> > ... 25 more >> > >> > >> > How do I (without using yarn-session) tell the YarnClusterClient to >> > simply >> > 'keep running because there will be more jobs'? >> > >> > If I run this same code in a yarn-session it works but then I have the >> > troubles of starting a (detached yarn-session) AND to terminate that >> > thing >> > again after my jobs have run. >> > >> > -- >> > Best regards / Met vriendelijke groeten, >> > >> > Niels Basjes > > > > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes