I agree.

@Marton:
The idea with the extra thread does not work, because the method
JobClient.submitJobAndWait(...) does not return regularly if
ForkableFlinkMiniCluster.shutdown() is called -- instead an exception
occurs:

> Exception in thread "Thread-8" java.lang.RuntimeException: 
> org.apache.flink.runtime.client.JobTimeoutException: Lost connection to job 
> manager.
>       at 
> org.apache.flink.streaming.util.TestStreamEnvironment$1.run(TestStreamEnvironment.java:119)
> Caused by: org.apache.flink.runtime.client.JobTimeoutException: Lost 
> connection to job manager.
>       at 
> org.apache.flink.runtime.client.JobClient$.submitJobAndWait(JobClient.scala:228)
>       at 
> org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.scala)
>       at 
> org.apache.flink.streaming.util.TestStreamEnvironment$1.run(TestStreamEnvironment.java:117)
> Caused by: akka.pattern.AskTimeoutException: 
> Recipient[Actor[akka://flink/user/jobclient#-596117797]] had already been 
> terminated.
>       at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:132)
>       at akka.pattern.AskableActorRef$.$qmark$extension(AskSupport.scala:144)
>       at 
> org.apache.flink.runtime.client.JobClient$.submitJobAndWait(JobClient.scala:222)
>       ... 2 more


Thus, I cannot get an JobExecutionResult this way, either.


-Matthias


On 04/01/2015 02:36 PM, Stephan Ewen wrote:
> As a followup - I think it would be a good thing to add a way to gracefully
> stop a streaming job.
> 
> Something that sends "close" to the sources, and they quit.
> 
> We can use this for graceful shutdown wen re-partitioninig / scaling in or
> out, ...
> 
> On Wed, Apr 1, 2015 at 1:29 PM, Matthias J. Sax <
> mj...@informatik.hu-berlin.de> wrote:
> 
>> Hi,
>>
>> I will pull the fix and try it out.
>>
>> Thanks for the hint with the extra Thread. That should work for me. But
>> you are actually right; my setup is Storm inspired. I thinks its a very
>> natural way to deploy and stop and infinite streaming job. Maybe, you
>> want to adopt to it.
>>
>> The ITCase I am writing bases on StreamingProgramTestBase, so I need the
>> JobExecutionResult because the test fails without it.
>>
>>
>> -Matthias
>>
>>
>>
>> On 04/01/2015 11:09 AM, Márton Balassi wrote:
>>> Hey Matthias,
>>>
>>> Thanks for reporting the Exception thrown, we were not preparing for this
>>> use case yet. We fixed it with Gyula, he is pushing a fix for it right
>> now:
>>> When the job is cancelled (for example due to shutting down the executor
>>> underneath) you should not see that InterruptedException as soon as this
>>> commit is in. [1]
>>>
>>> As for getting the streaming JobExecutionResult back from a detached job
>> my
>>> current best practice is what you can see in
>>> the ProcessFailureRecoveryTestBase and its streaming implementation:
>>> starting an executor in a separate thread and then joining it with the
>> main
>>> one. Would you prefer a more Storm example-ish solution? [2]
>>>
>>> [1] https://github.com/mbalassi/flink/commit/5db06d6d
>>> [2]
>>>
>> https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java#L99-104
>>>
>>> On Tue, Mar 31, 2015 at 2:54 PM, Matthias J. Sax <
>>> mj...@informatik.hu-berlin.de> wrote:
>>>
>>>> Hi Robert,
>>>>
>>>> thanks for your answer.
>>>>
>>>> I get an InterruptedException when I call shutdown():
>>>>
>>>> java.lang.InterruptedException
>>>>         at java.lang.Object.wait(Native Method)
>>>>         at java.lang.Thread.join(Thread.java:1225)
>>>>         at java.lang.Thread.join(Thread.java:1278)
>>>>         at
>>>>
>>>>
>> org.apache.flink.streaming.io.StreamRecordWriter.close(StreamRecordWriter.java:55)
>>>>         at
>>>>
>>>>
>> org.apache.flink.streaming.api.collector.StreamOutput.close(StreamOutput.java:77)
>>>>         at
>>>>
>>>>
>> org.apache.flink.streaming.api.streamvertex.OutputHandler.flushOutputs(OutputHandler.java:204)
>>>>         at
>>>>
>>>>
>> org.apache.flink.streaming.api.streamvertex.StreamVertex.invoke(StreamVertex.java:195)
>>>>         at
>>>>
>>>>
>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
>>>>         at java.lang.Thread.run(Thread.java:701)
>>>>
>>>>
>>>> About the JobExecutionResult:
>>>>
>>>> I added a new method to the API, that calls
>>>> JobClient.submitJobDetached(...) instead of
>>>> JobClient.submitJobAndWait(...). The "detached" version has no return
>>>> value, while the blocking one returns a JobExecutionResult that is
>>>> further returned by execute(). So I cannot get a JobExecutionResult
>>>> right now.
>>>>
>>>> It would be nice to get the JobExecutionResult when stopping the running
>>>> program via a "stop-execution"-call (is there any way to do this?).
>>>> Right now, I sleep for a certain time after calling
>>>> submitJobDetached(...) an call stop() and shutdown() later on (from
>>>> ForkableMiniCluster). The stop() call does not seem to do anything...
>>>> shutdown() works (except for the Exception I get -- as described above).
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>> On 03/30/2015 09:08 PM, Robert Metzger wrote:
>>>>> Hi Matthias,
>>>>>
>>>>> the streaming folks can probably answer the questions better. But I'll
>>>>> write something to bring this message back to their attention ;)
>>>>>
>>>>> 1) Which exceptions are you seeing? Flink should be able to cleanly
>> shut
>>>>> down.
>>>>> 2) As far as I saw it, the execute() method (of the Streaming API) got
>> an
>>>>> JobExecutionResult return type in the latest master. That contains
>>>>> accumulator results.
>>>>> 3) I think the cancel() method is there for exactly that purpose. If
>> the
>>>>> job is shutting down before the cancel method, that probably a bug.
>>>>>
>>>>>
>>>>> Robert
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Mar 27, 2015 at 10:07 PM, Matthias J. Sax <
>>>>> mj...@informatik.hu-berlin.de> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I am trying to run an infinite streaming job (ie, one that does not
>>>>>> terminate because it is generating output date randomly on the fly). I
>>>>>> kill this job with .stop() or .shutdown() method of
>>>>>> ForkableFlinkMiniCluster.
>>>>>>
>>>>>> I did not find any example using a similar setup. In the provided
>>>>>> examples, each job terminate automatically, because only a finite
>> input
>>>>>> is processed and the source returns after all data is emitted.
>>>>>>
>>>>>>
>>>>>> I have multiple question about my setup:
>>>>>>
>>>>>>  1) The job never terminates "clean", ie, I get some exceptions. Is
>> this
>>>>>> behavior desired?
>>>>>>
>>>>>>  2) Is it possible to get a result back? Similar to
>>>>>> JobClient.submitJobAndWait(...)?
>>>>>>
>>>>>>  3) Is it somehow possible, to send a signal to the running job such
>>>>>> that the source can terminate regularly as if finite input would be
>>>>>> processed? Right now, I use an while(running) loop and set 'running'
>> to
>>>>>> false in the .cancel() method.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Thanks for your help!
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to