Hi Aljoscha, thanks for the answer. executeDetached() sounds super helpful for testing. You could basically return a Future for stopping, cancelling and so on.
In my current IT I dont have a special sink, I am checking the resulting files directly, but live access to accumulators sounds very helpful nonetheless. Cheers, Konstantin On 31.08.2016 11:24, Aljoscha Krettek wrote: > Hi Konstantin, > I think this is not possible with the current API but I've been thinking > about similar stuff this week. Let me quickly outline what I was > thinking and then you can tell me whether that would also be helpful for > you. > > The basic problem is this: I want to be able to write ITCases that test > a (theoretically) infinite streaming job but I still have some > conditions for termination that I can check in some operator/sink of the > program. The problem is now that I have no way of knowing if I reached > the final condition and I have no way of canceling the job once that is > reached. > > The solution I came up with is this: > - Enhance StreamExecutionEnvironment with a new method > executeDetached() (or some such name) that returns a JobSubmissionResult > that has methods to query the state of the running job, to cancel the > running job and to (and this is important) query the accumulators of the > job (live updated). > - Have accumulators with special names in an operator that are used to > signal a "finished" condition, i.e. something like "condition-1-success" > or "condition-1-failure". > - Start the test job in detached mode, periodically check the > accumulators, once you have seen all required signals cancel the job and > report success, if you see a failure accumulator cancel the job and > report test failure. > > What do you think? > > I'm directly looping in Max and Stephan, Max had something like a > detached mode client floating around a while back, I think. > > Cheers, > Aljoscha > > On Mon, 29 Aug 2016 at 16:20 Konstantin Knauf > <konstantin.kn...@tngtech.com <mailto:konstantin.kn...@tngtech.com>> wrote: > > Hi everyone, > > I have an integration test for which a use a LocalStreamEnvironment. > Currently, the Flink Job is started in a separated thread, which I > interrupt after some time and then do some assertions. > > In this situation is there a better way to stop/cancel a running job in > LocalStreamEnvironment programmatically. Side-Info: The job is reading > from a Kafka Cluster, which is programmatically started for each > test run. > > Cheers, > > Konstantin > > -- > Konstantin Knauf * konstantin.kn...@tngtech.com > <mailto:konstantin.kn...@tngtech.com> * +49-174-3413182 > TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring > Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke > Sitz: Unterföhring * Amtsgericht München * HRB 135082 > -- Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke Sitz: Unterföhring * Amtsgericht München * HRB 135082
signature.asc
Description: OpenPGP digital signature