Hi Ashish,

how do you make sure that all of your data is not consumed within a
fraction of the 2 seconds? For this it would be better to use event time
which allows you to control how time passes. If you want to test a specific
operator you could try out the One/TwoInputStreamOperatorTestHarness.

Cheers,
Till

On Fri, Sep 14, 2018 at 3:36 PM ashish pok <ashish...@yahoo.com> wrote:

> All,
>
> Hopefully a quick one. I feel like I have seen this answered before a few
> times before but can't find an appropriate example. I am trying to run few
> tests where registered timeouts are invoked (snippet below). Simple example
> as show in documentation for integration test (using flink-test-utils)
> seems to complete even though Timers are registered and have not been
> invoked.
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setParallelism(1);
>         CollectSink.values.clear();
>         // create a stream of custom elements and apply transformations
>         env.fromCollection(t.getTestInputs())
> .process(new TupleProcessFn())
> .keyBy(FactTuple::getKey)
> .process(new NormalizeDataProcessFn(2))
> .addSink(getSink())
>
>         env.execute();
>
> I have a 2 second processing timer registered. If I put a breakpoint in
> first TupleProcessFn() after a few Tuples are collected I can see onTimer
> being invoked. So what is the trick here? I went as far as putting in a
> MapFunction after second process function that has a sleep to no avail.
>
> Thanks,
>
> Ashish
>

Reply via email to