I agree. @Marton: The idea with the extra thread does not work, because the method JobClient.submitJobAndWait(...) does not return regularly if ForkableFlinkMiniCluster.shutdown() is called -- instead an exception occurs:
> Exception in thread "Thread-8" java.lang.RuntimeException: > org.apache.flink.runtime.client.JobTimeoutException: Lost connection to job > manager. > at > org.apache.flink.streaming.util.TestStreamEnvironment$1.run(TestStreamEnvironment.java:119) > Caused by: org.apache.flink.runtime.client.JobTimeoutException: Lost > connection to job manager. > at > org.apache.flink.runtime.client.JobClient$.submitJobAndWait(JobClient.scala:228) > at > org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.scala) > at > org.apache.flink.streaming.util.TestStreamEnvironment$1.run(TestStreamEnvironment.java:117) > Caused by: akka.pattern.AskTimeoutException: > Recipient[Actor[akka://flink/user/jobclient#-596117797]] had already been > terminated. > at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:132) > at akka.pattern.AskableActorRef$.$qmark$extension(AskSupport.scala:144) > at > org.apache.flink.runtime.client.JobClient$.submitJobAndWait(JobClient.scala:222) > ... 2 more Thus, I cannot get an JobExecutionResult this way, either. -Matthias On 04/01/2015 02:36 PM, Stephan Ewen wrote: > As a followup - I think it would be a good thing to add a way to gracefully > stop a streaming job. > > Something that sends "close" to the sources, and they quit. > > We can use this for graceful shutdown wen re-partitioninig / scaling in or > out, ... > > On Wed, Apr 1, 2015 at 1:29 PM, Matthias J. Sax < > mj...@informatik.hu-berlin.de> wrote: > >> Hi, >> >> I will pull the fix and try it out. >> >> Thanks for the hint with the extra Thread. That should work for me. But >> you are actually right; my setup is Storm inspired. I thinks its a very >> natural way to deploy and stop and infinite streaming job. Maybe, you >> want to adopt to it. >> >> The ITCase I am writing bases on StreamingProgramTestBase, so I need the >> JobExecutionResult because the test fails without it. >> >> >> -Matthias >> >> >> >> On 04/01/2015 11:09 AM, Márton Balassi wrote: >>> Hey Matthias, >>> >>> Thanks for reporting the Exception thrown, we were not preparing for this >>> use case yet. We fixed it with Gyula, he is pushing a fix for it right >> now: >>> When the job is cancelled (for example due to shutting down the executor >>> underneath) you should not see that InterruptedException as soon as this >>> commit is in. [1] >>> >>> As for getting the streaming JobExecutionResult back from a detached job >> my >>> current best practice is what you can see in >>> the ProcessFailureRecoveryTestBase and its streaming implementation: >>> starting an executor in a separate thread and then joining it with the >> main >>> one. Would you prefer a more Storm example-ish solution? [2] >>> >>> [1] https://github.com/mbalassi/flink/commit/5db06d6d >>> [2] >>> >> https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java#L99-104 >>> >>> On Tue, Mar 31, 2015 at 2:54 PM, Matthias J. Sax < >>> mj...@informatik.hu-berlin.de> wrote: >>> >>>> Hi Robert, >>>> >>>> thanks for your answer. >>>> >>>> I get an InterruptedException when I call shutdown(): >>>> >>>> java.lang.InterruptedException >>>> at java.lang.Object.wait(Native Method) >>>> at java.lang.Thread.join(Thread.java:1225) >>>> at java.lang.Thread.join(Thread.java:1278) >>>> at >>>> >>>> >> org.apache.flink.streaming.io.StreamRecordWriter.close(StreamRecordWriter.java:55) >>>> at >>>> >>>> >> org.apache.flink.streaming.api.collector.StreamOutput.close(StreamOutput.java:77) >>>> at >>>> >>>> >> org.apache.flink.streaming.api.streamvertex.OutputHandler.flushOutputs(OutputHandler.java:204) >>>> at >>>> >>>> >> org.apache.flink.streaming.api.streamvertex.StreamVertex.invoke(StreamVertex.java:195) >>>> at >>>> >>>> >> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) >>>> at java.lang.Thread.run(Thread.java:701) >>>> >>>> >>>> About the JobExecutionResult: >>>> >>>> I added a new method to the API, that calls >>>> JobClient.submitJobDetached(...) instead of >>>> JobClient.submitJobAndWait(...). The "detached" version has no return >>>> value, while the blocking one returns a JobExecutionResult that is >>>> further returned by execute(). So I cannot get a JobExecutionResult >>>> right now. >>>> >>>> It would be nice to get the JobExecutionResult when stopping the running >>>> program via a "stop-execution"-call (is there any way to do this?). >>>> Right now, I sleep for a certain time after calling >>>> submitJobDetached(...) an call stop() and shutdown() later on (from >>>> ForkableMiniCluster). The stop() call does not seem to do anything... >>>> shutdown() works (except for the Exception I get -- as described above). >>>> >>>> >>>> -Matthias >>>> >>>> >>>> On 03/30/2015 09:08 PM, Robert Metzger wrote: >>>>> Hi Matthias, >>>>> >>>>> the streaming folks can probably answer the questions better. But I'll >>>>> write something to bring this message back to their attention ;) >>>>> >>>>> 1) Which exceptions are you seeing? Flink should be able to cleanly >> shut >>>>> down. >>>>> 2) As far as I saw it, the execute() method (of the Streaming API) got >> an >>>>> JobExecutionResult return type in the latest master. That contains >>>>> accumulator results. >>>>> 3) I think the cancel() method is there for exactly that purpose. If >> the >>>>> job is shutting down before the cancel method, that probably a bug. >>>>> >>>>> >>>>> Robert >>>>> >>>>> >>>>> >>>>> On Fri, Mar 27, 2015 at 10:07 PM, Matthias J. Sax < >>>>> mj...@informatik.hu-berlin.de> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I am trying to run an infinite streaming job (ie, one that does not >>>>>> terminate because it is generating output date randomly on the fly). I >>>>>> kill this job with .stop() or .shutdown() method of >>>>>> ForkableFlinkMiniCluster. >>>>>> >>>>>> I did not find any example using a similar setup. In the provided >>>>>> examples, each job terminate automatically, because only a finite >> input >>>>>> is processed and the source returns after all data is emitted. >>>>>> >>>>>> >>>>>> I have multiple question about my setup: >>>>>> >>>>>> 1) The job never terminates "clean", ie, I get some exceptions. Is >> this >>>>>> behavior desired? >>>>>> >>>>>> 2) Is it possible to get a result back? Similar to >>>>>> JobClient.submitJobAndWait(...)? >>>>>> >>>>>> 3) Is it somehow possible, to send a signal to the running job such >>>>>> that the source can terminate regularly as if finite input would be >>>>>> processed? Right now, I use an while(running) loop and set 'running' >> to >>>>>> false in the .cancel() method. >>>>>> >>>>>> >>>>>> >>>>>> Thanks for your help! >>>>>> >>>>>> -Matthias >>>>>> >>>>>> >>>>>> >>>>> >>>> >>>> >>> >> >> >
signature.asc
Description: OpenPGP digital signature