Are you making use of TestStream for your Unit test?

On Wed, 13 Jan 2021 at 00:27, Raman Gupta <rocketra...@gmail.com> wrote:

> Your reply made me realize I removed the condition from my local copy of
> the looping timer that brings the timer forward if it encounters an earlier
> element later in the stream:
>
> if (currentTimerValue == null || currentTimerValue >    
> nextTimerTimeBasedOnCurrentElement.getMillis()) {
>
>
> Restoring that condition fixes that issue.
>
> However, the reason I removed that condition in the first place was
> because it was making a unit test non-deterministic -- sometimes the
> element timestamps into the looping timer didn't seem to match the element
> timestamps according to the EARLIEST timestamp combiner defined, causing
> the timer to execute an additional time.
>
> The pipeline:
>
> input
>   // withAllowedTimestampSkew is deprecated, but as of now, there is no 
> replacement
>   // https://issues.apache.org/jira/browse/BEAM-644
>   .apply("XTimestamps", WithTimestamps
>     .of<X> { it.enteredAt.asJoda() }
>     .withAllowedTimestampSkew(Duration.INFINITE.asJoda())
>   )
>   .apply("FixedTickWindows",
>     Window.into<X>(FixedWindows.of(5.minutes.asJoda()))
>       .triggering(
>         AfterWatermark.pastEndOfWindow()
>           .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
>           .withLateFirings(AfterPane.elementCountAtLeast(1))
>       )
>       .withAllowedLateness(3.days.asJoda(), 
> Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
>       .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_ALWAYS)
>       .discardingFiredPanes()
>       .withTimestampCombiner(TimestampCombiner.EARLIEST)
>   )
>   .apply("KeyByUser", WithKeys.of { it.userId })
>   .apply("GroupByUser", GroupByKey.create())
>   .apply("GlobalWindowsLoopingStatefulTimer",
>     Window.into<KV<String, Iterable<X>>>(GlobalWindows())
>       
> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
>       .discardingFiredPanes()
>       .withTimestampCombiner(TimestampCombiner.EARLIEST)
>   )
>   .apply("LoopingStatefulTimer",
>     ParDo.of(LoopingStatefulTimer(5.minutes, (options.timerTimeoutDays ?: 
> 30).days)))
>
>
> The looping timer receives an @Timestamp value in the process function of:
>
> 1970-01-01T07:34:59.999Z
>
> but the earliest timestamp of the (single) element in the elements
> iterable is:
>
> 1970-01-01T07:30:00.000Z
>
> I would have thought given my timestamp combiners on my windows that the
> timestamp should have been 07:30:00.000Z. Is there something wrong in my
> pipeline that is causing this non-deterministic behavior?
>
> Thanks,
> Raman
>
> On Tue, Jan 12, 2021 at 9:47 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi Raman,
>>
>> can you share the details of the pipeline? How exactly are you using the
>> looping timer? Timer as described in the linked blog post should be
>> deterministic even when the order of the input elements is undefined.
>> Does you logic depend on element ordering?
>>
>>   Jan
>>
>> On 1/12/21 3:18 PM, Raman Gupta wrote:
>> > Hello, I am building and testing a pipeline with the direct runner.
>> > The pipeline includes a looping timer -
>> > https://beam.apache.org/blog/looping-timers/.
>> >
>> > For now, I am using JdbcIO to obtain my input data, though when put
>> > into production the pipeline will use PubSubIO.
>> >
>> > I am finding that the looping timer begins producing outputs at a
>> > random event time, which makes some sense given the randomization of
>> > inputs in the direct runner. However, this makes the results of
>> > executing my pipeline with the direct runner completely
>> non-deterministic.
>> >
>> > So:
>> >
>> > 1) Is there a way to turn off this non-deterministic behavior, but
>> > just for the GlobalWindow / LoopingTimer?
>> >
>> > 2) Perhaps alternatively, is there a way to "initialize" the looping
>> > timer when the pipeline starts, rather than when it first sees an
>> > element? Perhaps a side input?
>> >
>> > 3) Am I right in assuming that when I move this pipeline to pub/sub io
>> > and operate it in streaming mode, this issue will go away?
>> >
>> > Thanks!
>> > Raman
>> >
>>
>

Reply via email to