Hi Ashish, how do you make sure that all of your data is not consumed within a fraction of the 2 seconds? For this it would be better to use event time which allows you to control how time passes. If you want to test a specific operator you could try out the One/TwoInputStreamOperatorTestHarness.
Cheers, Till On Fri, Sep 14, 2018 at 3:36 PM ashish pok <ashish...@yahoo.com> wrote: > All, > > Hopefully a quick one. I feel like I have seen this answered before a few > times before but can't find an appropriate example. I am trying to run few > tests where registered timeouts are invoked (snippet below). Simple example > as show in documentation for integration test (using flink-test-utils) > seems to complete even though Timers are registered and have not been > invoked. > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > CollectSink.values.clear(); > // create a stream of custom elements and apply transformations > env.fromCollection(t.getTestInputs()) > .process(new TupleProcessFn()) > .keyBy(FactTuple::getKey) > .process(new NormalizeDataProcessFn(2)) > .addSink(getSink()) > > env.execute(); > > I have a 2 second processing timer registered. If I put a breakpoint in > first TupleProcessFn() after a few Tuples are collected I can see onTimer > being invoked. So what is the trick here? I went as far as putting in a > MapFunction after second process function that has a sleep to no avail. > > Thanks, > > Ashish >