Hi Till,
A quick update. I added a MockFlatMap function with the following logic:
        public MockStreamPause(int pauseSeconds) { this.pauseSeconds = 
pauseSeconds; }
        @Override public void flatMap(PlatformEvent event, 
Collector<PlatformEvent> out) throws Exception { 
if(event.getSrc().startsWith(EventTupleGeneratorUtil.PAUSE_EVENT_SRC_PREFIX)) { 
if (pauseSeconds>0) { try { Thread.sleep(pauseSeconds*1000); } catch 
(InterruptedException intEx) { logger.info("Mock pause interrupted", intEx); } 
} } else { out.collect(event); } }
This seems to let me move forward with testing. Let me know if you recommend 
using latest test utils with 1.4.2 core as a test. 
Thanks, Ashish
    On Monday, September 17, 2018, 9:33:56 AM EDT, ashish pok 
<ashish...@yahoo.com> wrote:  
  Hi Till,
I am still in 1.4.2 version and will need some time before we can get later 
version certified in our Prod env. Timers are definitely not completing in my 
tests with 1.4.2 utils, I can see them being registered in debugger though. 
Having said that, should I pull latest test utils only and try it out?  
Creating a simple example shouldn't take that long, I can create one sometime 
this week.
Thanks, Ashish
    On Monday, September 17, 2018, 3:53:59 AM EDT, Till Rohrmann 
<trohrm...@apache.org> wrote:  
 Hi Ashish,
I think you are right. In the current master, the system should wait until all 
timers have completed before terminating. Could you check whether this is the 
case? If not, then this might indicate a problem. Which version of Flink are 
you using? I guess it would also be helpful to have access to a working example 
where the problem is visible.
On Fri, Sep 14, 2018 at 7:57 PM ashish pok <ashish...@yahoo.com> wrote:

 Hi Till,
To answer your first question, I currently don't (and honestly now sure how 
other than of course in IDE I can use breakpoint, or if something like MockIto 
can do it). So did I interpret it correctly that it sounds like execution env 
started using flink-test-utils will essentially tear down once it consumes last 
data point (ie. end of collection I am passing) even though there could be 
active Timers Registered? 
Further, most of our pipelines are using low-level process functions - we toyed 
around with other windowing and session functions but process functions gave 
the most amount of flexibility (at least at this point until we can re-visit) 
and we generate keys for aggregation/windowing somewhere upstream (say map, 
flatMap or another process functions). Meaning some pipelines are event / 
processing time agnostic in a sense. Although technically within the process 
functions we will have timers registered etc. This helped us with unbounded 
keys, sensor data that could potentially be backfilled (ie. watermarks have 
passed way back etc). I wouldn't doubt a bit there are probably better 
solutions :)
With that background, I am sort of not following your second note about event 
time and how we can leverage that for testing. Our intent is to create sampled 
input from results and compare output from tests to results (ie. end to end 
integration tests) as part of our CICD. Normal flow seems to work well, just 
getting "negative" test cases of timeouts seems to be mystery right now :) So 
Single Operator harnesses doesn't sound like the right approach. let me know 

    On Friday, September 14, 2018, 11:42:17 AM EDT, Till Rohrmann 
<trohrm...@apache.org> wrote:  
 Hi Ashish,
how do you make sure that all of your data is not consumed within a fraction of 
the 2 seconds? For this it would be better to use event time which allows you 
to control how time passes. If you want to test a specific operator you could 
try out the One/TwoInputStreamOperatorTestHarness.
On Fri, Sep 14, 2018 at 3:36 PM ashish pok <ashish...@yahoo.com> wrote:

Hopefully a quick one. I feel like I have seen this answered before a few times 
before but can't find an appropriate example. I am trying to run few tests 
where registered timeouts are invoked (snippet below). Simple example as show 
in documentation for integration test (using flink-test-utils) seems to 
complete even though Timers are registered and have not been invoked. 
 StreamExecutionEnvironment env = 
env.setParallelism(1);        CollectSink.values.clear();        // create a 
stream of custom elements and apply transformations        
env.fromCollection(t.getTestInputs()) .process(new TupleProcessFn()) 
.keyBy(FactTuple::getKey) .process(new NormalizeDataProcessFn(2)) 

I have a 2 second processing timer registered. If I put a breakpoint in first 
TupleProcessFn() after a few Tuples are collected I can see onTimer being 
invoked. So what is the trick here? I went as far as putting in a MapFunction 
after second process function that has a sleep to no avail.

Reply via email to