Hi, I will pull the fix and try it out.
Thanks for the hint with the extra Thread. That should work for me. But you are actually right; my setup is Storm inspired. I thinks its a very natural way to deploy and stop and infinite streaming job. Maybe, you want to adopt to it. The ITCase I am writing bases on StreamingProgramTestBase, so I need the JobExecutionResult because the test fails without it. -Matthias On 04/01/2015 11:09 AM, Márton Balassi wrote: > Hey Matthias, > > Thanks for reporting the Exception thrown, we were not preparing for this > use case yet. We fixed it with Gyula, he is pushing a fix for it right now: > When the job is cancelled (for example due to shutting down the executor > underneath) you should not see that InterruptedException as soon as this > commit is in. [1] > > As for getting the streaming JobExecutionResult back from a detached job my > current best practice is what you can see in > the ProcessFailureRecoveryTestBase and its streaming implementation: > starting an executor in a separate thread and then joining it with the main > one. Would you prefer a more Storm example-ish solution? [2] > > [1] https://github.com/mbalassi/flink/commit/5db06d6d > [2] > https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java#L99-104 > > On Tue, Mar 31, 2015 at 2:54 PM, Matthias J. Sax < > mj...@informatik.hu-berlin.de> wrote: > >> Hi Robert, >> >> thanks for your answer. >> >> I get an InterruptedException when I call shutdown(): >> >> java.lang.InterruptedException >> at java.lang.Object.wait(Native Method) >> at java.lang.Thread.join(Thread.java:1225) >> at java.lang.Thread.join(Thread.java:1278) >> at >> >> org.apache.flink.streaming.io.StreamRecordWriter.close(StreamRecordWriter.java:55) >> at >> >> org.apache.flink.streaming.api.collector.StreamOutput.close(StreamOutput.java:77) >> at >> >> org.apache.flink.streaming.api.streamvertex.OutputHandler.flushOutputs(OutputHandler.java:204) >> at >> >> org.apache.flink.streaming.api.streamvertex.StreamVertex.invoke(StreamVertex.java:195) >> at >> >> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) >> at java.lang.Thread.run(Thread.java:701) >> >> >> About the JobExecutionResult: >> >> I added a new method to the API, that calls >> JobClient.submitJobDetached(...) instead of >> JobClient.submitJobAndWait(...). The "detached" version has no return >> value, while the blocking one returns a JobExecutionResult that is >> further returned by execute(). So I cannot get a JobExecutionResult >> right now. >> >> It would be nice to get the JobExecutionResult when stopping the running >> program via a "stop-execution"-call (is there any way to do this?). >> Right now, I sleep for a certain time after calling >> submitJobDetached(...) an call stop() and shutdown() later on (from >> ForkableMiniCluster). The stop() call does not seem to do anything... >> shutdown() works (except for the Exception I get -- as described above). >> >> >> -Matthias >> >> >> On 03/30/2015 09:08 PM, Robert Metzger wrote: >>> Hi Matthias, >>> >>> the streaming folks can probably answer the questions better. But I'll >>> write something to bring this message back to their attention ;) >>> >>> 1) Which exceptions are you seeing? Flink should be able to cleanly shut >>> down. >>> 2) As far as I saw it, the execute() method (of the Streaming API) got an >>> JobExecutionResult return type in the latest master. That contains >>> accumulator results. >>> 3) I think the cancel() method is there for exactly that purpose. If the >>> job is shutting down before the cancel method, that probably a bug. >>> >>> >>> Robert >>> >>> >>> >>> On Fri, Mar 27, 2015 at 10:07 PM, Matthias J. Sax < >>> mj...@informatik.hu-berlin.de> wrote: >>> >>>> Hi, >>>> >>>> I am trying to run an infinite streaming job (ie, one that does not >>>> terminate because it is generating output date randomly on the fly). I >>>> kill this job with .stop() or .shutdown() method of >>>> ForkableFlinkMiniCluster. >>>> >>>> I did not find any example using a similar setup. In the provided >>>> examples, each job terminate automatically, because only a finite input >>>> is processed and the source returns after all data is emitted. >>>> >>>> >>>> I have multiple question about my setup: >>>> >>>> 1) The job never terminates "clean", ie, I get some exceptions. Is this >>>> behavior desired? >>>> >>>> 2) Is it possible to get a result back? Similar to >>>> JobClient.submitJobAndWait(...)? >>>> >>>> 3) Is it somehow possible, to send a signal to the running job such >>>> that the source can terminate regularly as if finite input would be >>>> processed? Right now, I use an while(running) loop and set 'running' to >>>> false in the .cancel() method. >>>> >>>> >>>> >>>> Thanks for your help! >>>> >>>> -Matthias >>>> >>>> >>>> >>> >> >> >
signature.asc
Description: OpenPGP digital signature