Hi Robert,

thanks for your answer.

I get an InterruptedException when I call shutdown():

java.lang.InterruptedException
        at java.lang.Object.wait(Native Method)
        at java.lang.Thread.join(Thread.java:1225)
        at java.lang.Thread.join(Thread.java:1278)
        at
org.apache.flink.streaming.io.StreamRecordWriter.close(StreamRecordWriter.java:55)
        at
org.apache.flink.streaming.api.collector.StreamOutput.close(StreamOutput.java:77)
        at
org.apache.flink.streaming.api.streamvertex.OutputHandler.flushOutputs(OutputHandler.java:204)
        at
org.apache.flink.streaming.api.streamvertex.StreamVertex.invoke(StreamVertex.java:195)
        at
org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
        at java.lang.Thread.run(Thread.java:701)


About the JobExecutionResult:

I added a new method to the API, that calls
JobClient.submitJobDetached(...) instead of
JobClient.submitJobAndWait(...). The "detached" version has no return
value, while the blocking one returns a JobExecutionResult that is
further returned by execute(). So I cannot get a JobExecutionResult
right now.

It would be nice to get the JobExecutionResult when stopping the running
program via a "stop-execution"-call (is there any way to do this?).
Right now, I sleep for a certain time after calling
submitJobDetached(...) an call stop() and shutdown() later on (from
ForkableMiniCluster). The stop() call does not seem to do anything...
shutdown() works (except for the Exception I get -- as described above).


-Matthias


On 03/30/2015 09:08 PM, Robert Metzger wrote:
> Hi Matthias,
> 
> the streaming folks can probably answer the questions better. But I'll
> write something to bring this message back to their attention ;)
> 
> 1) Which exceptions are you seeing? Flink should be able to cleanly shut
> down.
> 2) As far as I saw it, the execute() method (of the Streaming API) got an
> JobExecutionResult return type in the latest master. That contains
> accumulator results.
> 3) I think the cancel() method is there for exactly that purpose. If the
> job is shutting down before the cancel method, that probably a bug.
> 
> 
> Robert
> 
> 
> 
> On Fri, Mar 27, 2015 at 10:07 PM, Matthias J. Sax <
> mj...@informatik.hu-berlin.de> wrote:
> 
>> Hi,
>>
>> I am trying to run an infinite streaming job (ie, one that does not
>> terminate because it is generating output date randomly on the fly). I
>> kill this job with .stop() or .shutdown() method of
>> ForkableFlinkMiniCluster.
>>
>> I did not find any example using a similar setup. In the provided
>> examples, each job terminate automatically, because only a finite input
>> is processed and the source returns after all data is emitted.
>>
>>
>> I have multiple question about my setup:
>>
>>  1) The job never terminates "clean", ie, I get some exceptions. Is this
>> behavior desired?
>>
>>  2) Is it possible to get a result back? Similar to
>> JobClient.submitJobAndWait(...)?
>>
>>  3) Is it somehow possible, to send a signal to the running job such
>> that the source can terminate regularly as if finite input would be
>> processed? Right now, I use an while(running) loop and set 'running' to
>> false in the .cancel() method.
>>
>>
>>
>> Thanks for your help!
>>
>> -Matthias
>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to