Hey Matthias, Thanks for reporting the Exception thrown, we were not preparing for this use case yet. We fixed it with Gyula, he is pushing a fix for it right now: When the job is cancelled (for example due to shutting down the executor underneath) you should not see that InterruptedException as soon as this commit is in. [1]
As for getting the streaming JobExecutionResult back from a detached job my current best practice is what you can see in the ProcessFailureRecoveryTestBase and its streaming implementation: starting an executor in a separate thread and then joining it with the main one. Would you prefer a more Storm example-ish solution? [2] [1] https://github.com/mbalassi/flink/commit/5db06d6d [2] https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java#L99-104 On Tue, Mar 31, 2015 at 2:54 PM, Matthias J. Sax < mj...@informatik.hu-berlin.de> wrote: > Hi Robert, > > thanks for your answer. > > I get an InterruptedException when I call shutdown(): > > java.lang.InterruptedException > at java.lang.Object.wait(Native Method) > at java.lang.Thread.join(Thread.java:1225) > at java.lang.Thread.join(Thread.java:1278) > at > > org.apache.flink.streaming.io.StreamRecordWriter.close(StreamRecordWriter.java:55) > at > > org.apache.flink.streaming.api.collector.StreamOutput.close(StreamOutput.java:77) > at > > org.apache.flink.streaming.api.streamvertex.OutputHandler.flushOutputs(OutputHandler.java:204) > at > > org.apache.flink.streaming.api.streamvertex.StreamVertex.invoke(StreamVertex.java:195) > at > > org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) > at java.lang.Thread.run(Thread.java:701) > > > About the JobExecutionResult: > > I added a new method to the API, that calls > JobClient.submitJobDetached(...) instead of > JobClient.submitJobAndWait(...). The "detached" version has no return > value, while the blocking one returns a JobExecutionResult that is > further returned by execute(). So I cannot get a JobExecutionResult > right now. > > It would be nice to get the JobExecutionResult when stopping the running > program via a "stop-execution"-call (is there any way to do this?). > Right now, I sleep for a certain time after calling > submitJobDetached(...) an call stop() and shutdown() later on (from > ForkableMiniCluster). The stop() call does not seem to do anything... > shutdown() works (except for the Exception I get -- as described above). > > > -Matthias > > > On 03/30/2015 09:08 PM, Robert Metzger wrote: > > Hi Matthias, > > > > the streaming folks can probably answer the questions better. But I'll > > write something to bring this message back to their attention ;) > > > > 1) Which exceptions are you seeing? Flink should be able to cleanly shut > > down. > > 2) As far as I saw it, the execute() method (of the Streaming API) got an > > JobExecutionResult return type in the latest master. That contains > > accumulator results. > > 3) I think the cancel() method is there for exactly that purpose. If the > > job is shutting down before the cancel method, that probably a bug. > > > > > > Robert > > > > > > > > On Fri, Mar 27, 2015 at 10:07 PM, Matthias J. Sax < > > mj...@informatik.hu-berlin.de> wrote: > > > >> Hi, > >> > >> I am trying to run an infinite streaming job (ie, one that does not > >> terminate because it is generating output date randomly on the fly). I > >> kill this job with .stop() or .shutdown() method of > >> ForkableFlinkMiniCluster. > >> > >> I did not find any example using a similar setup. In the provided > >> examples, each job terminate automatically, because only a finite input > >> is processed and the source returns after all data is emitted. > >> > >> > >> I have multiple question about my setup: > >> > >> 1) The job never terminates "clean", ie, I get some exceptions. Is this > >> behavior desired? > >> > >> 2) Is it possible to get a result back? Similar to > >> JobClient.submitJobAndWait(...)? > >> > >> 3) Is it somehow possible, to send a signal to the running job such > >> that the source can terminate regularly as if finite input would be > >> processed? Right now, I use an while(running) loop and set 'running' to > >> false in the .cancel() method. > >> > >> > >> > >> Thanks for your help! > >> > >> -Matthias > >> > >> > >> > > > >