Hi Ashish, I think you are right. In the current master, the system should wait until all timers have completed before terminating. Could you check whether this is the case? If not, then this might indicate a problem. Which version of Flink are you using? I guess it would also be helpful to have access to a working example where the problem is visible.
Cheers, Till On Fri, Sep 14, 2018 at 7:57 PM ashish pok <ashish...@yahoo.com> wrote: > Hi Till, > > To answer your first question, I currently don't (and honestly now sure > how other than of course in IDE I can use breakpoint, or if something like > MockIto can do it). So did I interpret it correctly that it sounds like > execution env started using flink-test-utils will essentially tear down > once it consumes last data point (ie. end of collection I am passing) even > though there could be active Timers Registered? > > Further, most of our pipelines are using low-level process functions - we > toyed around with other windowing and session functions but process > functions gave the most amount of flexibility (at least at this point until > we can re-visit) and we generate keys for aggregation/windowing somewhere > upstream (say map, flatMap or another process functions). Meaning some > pipelines are event / processing time agnostic in a sense. Although > technically within the process functions we will have timers registered > etc. This helped us with unbounded keys, sensor data that could potentially > be backfilled (ie. watermarks have passed way back etc). I wouldn't doubt a > bit there are probably better solutions :) > > With that background, I am sort of not following your second note about > event time and how we can leverage that for testing. Our intent is to > create sampled input from results and compare output from tests to results > (ie. end to end integration tests) as part of our CICD. Normal flow seems > to work well, just getting "negative" test cases of timeouts seems to be > mystery right now :) So Single Operator harnesses doesn't sound like the > right approach. let me know otherwise. > > Thanks, > > > On Friday, September 14, 2018, 11:42:17 AM EDT, Till Rohrmann < > trohrm...@apache.org> wrote: > > > Hi Ashish, > > how do you make sure that all of your data is not consumed within a > fraction of the 2 seconds? For this it would be better to use event time > which allows you to control how time passes. If you want to test a specific > operator you could try out the One/TwoInputStreamOperatorTestHarness. > > Cheers, > Till > > On Fri, Sep 14, 2018 at 3:36 PM ashish pok <ashish...@yahoo.com> wrote: > > All, > > Hopefully a quick one. I feel like I have seen this answered before a few > times before but can't find an appropriate example. I am trying to run few > tests where registered timeouts are invoked (snippet below). Simple example > as show in documentation for integration test (using flink-test-utils) > seems to complete even though Timers are registered and have not been > invoked. > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > CollectSink.values.clear(); > // create a stream of custom elements and apply transformations > env.fromCollection(t.getTestInputs()) > .process(new TupleProcessFn()) > .keyBy(FactTuple::getKey) > .process(new NormalizeDataProcessFn(2)) > .addSink(getSink()) > > env.execute(); > > I have a 2 second processing timer registered. If I put a breakpoint in > first TupleProcessFn() after a few Tuples are collected I can see onTimer > being invoked. So what is the trick here? I went as far as putting in a > MapFunction after second process function that has a sleep to no avail. > > Thanks, > > Ashish > >