1. The timestamp passed to testHarness.processElement should be the
timestamp that would have been extracted from the element by the
timestamp extractor in your watermark strategy.

2. Your tests should call testHarness.processWatermark and pass in the
watermark(s) you want to work with.

processBroadcastWatermark is used for testing the behavior of a
(Keyed)BroadcastProcessFunction when a watermark arrives on the
broadcast channel.

Your test might look something like this:

// send in some data
testHarness.processElement(6L, 10L);

// verify that a timer was created
assertThat(testHarness.numEventTimeTimers(), is(1));

// should cause the fire timer to fire
testHarness.processWatermark(new Watermark(20L));
assertThat(testHarness.numEventTimeTimers(), is(0));

// verify the results
assertThat(testHarness.getOutput(), containsInExactlyThisOrder(6L));

Best,
David


On Wed, Mar 22, 2023 at 6:09 AM Gabriel Angel Amarista Rodrigues
<gabriel.rodrig...@n26.com.invalid> wrote:
>
> Hello dear Flink team
>
> We are rather new to Flink and having problems understanding how unit
> testing works overall.
> We want to test the* onTimer()* method after registering a timer in the
> *processElement()* function. However, the *onTimer() *is never called.
>
> We were investigating the documentation, blog posts, books, anything we
> could find and collected a few questions/doubts that we would like your
> input on:
>
> 1. We are using event time and when calling *testHarness.processElement. *The
> function through TestHarness however requires a *timestamp*.
> Is this *timestamp *supposed to be set as the *processingTimestamp*?
> We are, however, interested in an *EventTimeFunction*, not a
> *ProcessingTimerFunction.* Would it make sense in our use case to call
> testHarness.processElement(event, event.getTimestamp())?
>
> 2. We have added a debug call to *ctx.currentWaterMark()* call in the
> original *processElement* function but this always returns *Long.MIN value*
> even though we call *processWaterMark* before the *processElement* call.
>
> Is the *processWatermark *constrained in tests somehow?
>
> We noticed there is also a *processBroadcastWatermark*. Is this necessary
> when we are working with* KeyedProcessingFunctions? *Is it analogous to
> *processBroadcastElement()*?
>
> We are really excited about working with Flink, it is a great tool.
> Great job!
>
> We will be waiting for your input.
> Thanks beforehand for your support.
>
> Kind regards,
> Gabriel Rodrigues

Reply via email to