Hi Robert, thanks for your answer.
I get an InterruptedException when I call shutdown(): java.lang.InterruptedException at java.lang.Object.wait(Native Method) at java.lang.Thread.join(Thread.java:1225) at java.lang.Thread.join(Thread.java:1278) at org.apache.flink.streaming.io.StreamRecordWriter.close(StreamRecordWriter.java:55) at org.apache.flink.streaming.api.collector.StreamOutput.close(StreamOutput.java:77) at org.apache.flink.streaming.api.streamvertex.OutputHandler.flushOutputs(OutputHandler.java:204) at org.apache.flink.streaming.api.streamvertex.StreamVertex.invoke(StreamVertex.java:195) at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) at java.lang.Thread.run(Thread.java:701) About the JobExecutionResult: I added a new method to the API, that calls JobClient.submitJobDetached(...) instead of JobClient.submitJobAndWait(...). The "detached" version has no return value, while the blocking one returns a JobExecutionResult that is further returned by execute(). So I cannot get a JobExecutionResult right now. It would be nice to get the JobExecutionResult when stopping the running program via a "stop-execution"-call (is there any way to do this?). Right now, I sleep for a certain time after calling submitJobDetached(...) an call stop() and shutdown() later on (from ForkableMiniCluster). The stop() call does not seem to do anything... shutdown() works (except for the Exception I get -- as described above). -Matthias On 03/30/2015 09:08 PM, Robert Metzger wrote: > Hi Matthias, > > the streaming folks can probably answer the questions better. But I'll > write something to bring this message back to their attention ;) > > 1) Which exceptions are you seeing? Flink should be able to cleanly shut > down. > 2) As far as I saw it, the execute() method (of the Streaming API) got an > JobExecutionResult return type in the latest master. That contains > accumulator results. > 3) I think the cancel() method is there for exactly that purpose. If the > job is shutting down before the cancel method, that probably a bug. > > > Robert > > > > On Fri, Mar 27, 2015 at 10:07 PM, Matthias J. Sax < > mj...@informatik.hu-berlin.de> wrote: > >> Hi, >> >> I am trying to run an infinite streaming job (ie, one that does not >> terminate because it is generating output date randomly on the fly). I >> kill this job with .stop() or .shutdown() method of >> ForkableFlinkMiniCluster. >> >> I did not find any example using a similar setup. In the provided >> examples, each job terminate automatically, because only a finite input >> is processed and the source returns after all data is emitted. >> >> >> I have multiple question about my setup: >> >> 1) The job never terminates "clean", ie, I get some exceptions. Is this >> behavior desired? >> >> 2) Is it possible to get a result back? Similar to >> JobClient.submitJobAndWait(...)? >> >> 3) Is it somehow possible, to send a signal to the running job such >> that the source can terminate regularly as if finite input would be >> processed? Right now, I use an while(running) loop and set 'running' to >> false in the .cancel() method. >> >> >> >> Thanks for your help! >> >> -Matthias >> >> >> >
signature.asc
Description: OpenPGP digital signature