Hi Konstantin,
I think this is not possible with the current API but I've been thinking
about similar stuff this week. Let me quickly outline what I was thinking
and then you can tell me whether that would also be helpful for you.

The basic problem is this: I want to be able to write ITCases that test a
(theoretically) infinite streaming job but I still have some conditions for
termination that I can check in some operator/sink of the program. The
problem is now that I have no way of knowing if I reached the final
condition and I have no way of canceling the job once that is reached.

The solution I came up with is this:
 - Enhance StreamExecutionEnvironment with a new method executeDetached()
(or some such name) that returns a JobSubmissionResult that has methods to
query the state of the running job, to cancel the running job and to (and
this is important) query the accumulators of the job (live updated).
 - Have accumulators with special names in an operator that are used to
signal a "finished" condition, i.e. something like "condition-1-success" or
"condition-1-failure".
 - Start the test job in detached mode, periodically check the
accumulators, once you have seen all required signals cancel the job and
report success, if you see a failure accumulator cancel the job and report
test failure.

What do you think?

I'm directly looping in Max and Stephan, Max had something like a detached
mode client floating around a while back, I think.

Cheers,
Aljoscha

On Mon, 29 Aug 2016 at 16:20 Konstantin Knauf <konstantin.kn...@tngtech.com>
wrote:

> Hi everyone,
>
> I have an integration test for which a use a LocalStreamEnvironment.
> Currently, the Flink Job is started in a separated thread, which I
> interrupt after some time and then do some assertions.
>
> In this situation is there a better way to stop/cancel a running job in
> LocalStreamEnvironment programmatically. Side-Info: The job is reading
> from a Kafka Cluster, which is programmatically started for each test run.
>
> Cheers,
>
> Konstantin
>
> --
> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>
>

Reply via email to