Hmm, I think I've found a simple solution... adding this to the beginning
of my looping timer @ProcessElement function:

// late elements don't need to affect our looping timer,
// pass them through without modification
// this is kind of a work-around for
https://issues.apache.org/jira/browse/BEAM-2262
// but I think makes sense in general for looping timers when
// there is no need to trigger timers after the window is done
if (paneInfo.timing == PaneInfo.Timing.LATE) {
  receiver.output(element)
  return
}

At least all my unit tests are passing... is there any problem with this
approach?

Thanks,
Raman


On Wed, Jan 13, 2021 at 9:42 AM Raman Gupta <rocketra...@gmail.com> wrote:

> (Replying to Reza) Yes, I am using TestStream for my unit test. Other
> replies below.
>
> On Wed, Jan 13, 2021 at 3:40 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi,
>>
>> yes, there is a possible non-determinism, that is related to the
>> timestamp combiner. Timestamp combiners combine only elements, that are not
>> 'late' ([1]), meaning that their timestamp is not preceding output
>> watermark of the GBK. Looking at the pipeline code I suppose that could be
>> the cause.
>>
>
> Yes, the test stream in this test case does indeed send the element in
> question "late". Here is the setup:
>
> val base = Instant.EPOCH + 6.hours
> val xStream: TestStream<X> = TestStream.create(coder)
>   .addElements(x["1"]) // this just initializes the looping timer
>   // advance watermark past end of window that would normally process x2
>   .advanceWatermarkTo((base + 3.hours + 1.minutes).asJoda())
>   .addElements(x["2"]) // now we see the element
>   .advanceWatermarkToInfinity()
>
> Here late element x["2"] has a timestamp of 1970-01-01T07:30:00.000Z and
> the watermark at the time x["2"] is added is 1970-01-01T09:00:01.000Z.
>
> So I get your point that the timestamp combiner is not used for late
> elements, but if late elements are singly emitted as in this pipeline, why
> do any timestamp modification at all? I would expect them to arrive with
> their original timestamp, not one changed from 1970-01-01T07:30:00.000Z
> to 1970-01-01T07:34:59.999Z (this is the part that seems
> non-deterministic). What is the logic / reason behind the pipeline setting
> this element's timestamp to 1970-01-01T07:34:59.999Z?
>
>
>> You can make the pipeline deterministic by using
>> TimestampCombiner.END_OF_WINDOW (default).
>>
>
> It's definitely not ideal for this use case, but I'll consider it.
>
>
>> If you *need* to use the TimestampCombiner.EARLIEST, you can probably do
>> that by tweaking the looping timer stateful dofn and fix timestamps there
>> (using timer output timestamp).
>>
>
> I had already tried that but the pipeline throws an error that the
> timestamp emitted cannot be earlier than the current element timestamp.
>
> Thanks,
> Raman
>
>
>
>>   Jan
>>
>> [1] https://issues.apache.org/jira/browse/BEAM-2262
>> On 1/12/21 5:26 PM, Raman Gupta wrote:
>>
>> Your reply made me realize I removed the condition from my local copy of
>> the looping timer that brings the timer forward if it encounters an earlier
>> element later in the stream:
>>
>> if (currentTimerValue == null || currentTimerValue >    
>> nextTimerTimeBasedOnCurrentElement.getMillis()) {
>>
>>
>> Restoring that condition fixes that issue.
>>
>> However, the reason I removed that condition in the first place was
>> because it was making a unit test non-deterministic -- sometimes the
>> element timestamps into the looping timer didn't seem to match the element
>> timestamps according to the EARLIEST timestamp combiner defined, causing
>> the timer to execute an additional time.
>>
>> The pipeline:
>>
>> input
>>   // withAllowedTimestampSkew is deprecated, but as of now, there is no 
>> replacement  // https://issues.apache.org/jira/browse/BEAM-644  
>> .apply("XTimestamps", WithTimestamps
>>     .of<X> { it.enteredAt.asJoda() }    
>> .withAllowedTimestampSkew(Duration.INFINITE.asJoda())
>>   )
>>   .apply("FixedTickWindows",    
>> Window.into<X>(FixedWindows.of(5.minutes.asJoda()))
>>       .triggering(
>>         AfterWatermark.pastEndOfWindow()
>>           .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
>>           .withLateFirings(AfterPane.elementCountAtLeast(1))
>>       )
>>       .withAllowedLateness(3.days.asJoda(), 
>> Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
>>       .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_ALWAYS)
>>       .discardingFiredPanes()
>>       .withTimestampCombiner(TimestampCombiner.EARLIEST)
>>   )
>>   .apply("KeyByUser", WithKeys.of { it.userId })
>>   .apply("GroupByUser", GroupByKey.create())
>>   .apply("GlobalWindowsLoopingStatefulTimer",    Window.into<KV<String, 
>> Iterable<X>>>(GlobalWindows())
>>       
>> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
>>       .discardingFiredPanes()
>>       .withTimestampCombiner(TimestampCombiner.EARLIEST)
>>   )
>>   .apply("LoopingStatefulTimer",    ParDo.of(LoopingStatefulTimer(5.minutes, 
>> (options.timerTimeoutDays ?: 30).days)))
>>
>>
>> The looping timer receives an @Timestamp value in the process function of:
>>
>> 1970-01-01T07:34:59.999Z
>>
>> but the earliest timestamp of the (single) element in the elements
>> iterable is:
>>
>> 1970-01-01T07:30:00.000Z
>>
>> I would have thought given my timestamp combiners on my windows that the
>> timestamp should have been 07:30:00.000Z. Is there something wrong in my
>> pipeline that is causing this non-deterministic behavior?
>>
>> Thanks,
>> Raman
>>
>> On Tue, Jan 12, 2021 at 9:47 AM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> Hi Raman,
>>>
>>> can you share the details of the pipeline? How exactly are you using the
>>> looping timer? Timer as described in the linked blog post should be
>>> deterministic even when the order of the input elements is undefined.
>>> Does you logic depend on element ordering?
>>>
>>>   Jan
>>>
>>> On 1/12/21 3:18 PM, Raman Gupta wrote:
>>> > Hello, I am building and testing a pipeline with the direct runner.
>>> > The pipeline includes a looping timer -
>>> > https://beam.apache.org/blog/looping-timers/.
>>> >
>>> > For now, I am using JdbcIO to obtain my input data, though when put
>>> > into production the pipeline will use PubSubIO.
>>> >
>>> > I am finding that the looping timer begins producing outputs at a
>>> > random event time, which makes some sense given the randomization of
>>> > inputs in the direct runner. However, this makes the results of
>>> > executing my pipeline with the direct runner completely
>>> non-deterministic.
>>> >
>>> > So:
>>> >
>>> > 1) Is there a way to turn off this non-deterministic behavior, but
>>> > just for the GlobalWindow / LoopingTimer?
>>> >
>>> > 2) Perhaps alternatively, is there a way to "initialize" the looping
>>> > timer when the pipeline starts, rather than when it first sees an
>>> > element? Perhaps a side input?
>>> >
>>> > 3) Am I right in assuming that when I move this pipeline to pub/sub io
>>> > and operate it in streaming mode, this issue will go away?
>>> >
>>> > Thanks!
>>> > Raman
>>> >
>>>
>>

Reply via email to