Thanks guys for all the help. I think the late-passthrough solution should
work for my use case.

On Wed, Jan 13, 2021 at 10:27 AM Reza Ardeshir Rokni <raro...@gmail.com>
wrote:

> Hi,
>
> Late data, in general, can be problematic for the looping timer pattern,
> as well as other use cases, where the arrival of data in the @process
> function creates a EventTime domain Timer. The solution you have, which
> essentially passes it through, is a nice option, another solution would be
> to deal with the late data in a branch upstream of the DoFn where timer
> work is happening ( via a multi output). The former is good , if your
> downstream transforms are not expecting specific behaviours from the
> looping timer characteristics. The latter is often easier to use when you
> want to write your downstream transforms without having to think about
> dealing with late data.
>
> Cheers
>
> Reza
>
> On Wed, 13 Jan 2021 at 23:12, Jan Lukavský <je...@seznam.cz> wrote:
>
>> I think there still could be problems in some corner cases. The problem
>> is, that elements considered 'late' in timestamp combiner have different
>> definition than what is marked as late in PaneInfo. So you can have a
>> corner case, when PaneInfo would on ON_TIME, but the timestamp would still
>> be shifted to end of window. This would probably not be too often, but it
>> can happen. If it is fine for your use case, then this could work.
>>
>> Jan
>> On 1/13/21 3:59 PM, Raman Gupta wrote:
>>
>> Hmm, I think I've found a simple solution... adding this to the beginning
>> of my looping timer @ProcessElement function:
>>
>> // late elements don't need to affect our looping timer,// pass them through 
>> without modification// this is kind of a work-around for 
>> https://issues.apache.org/jira/browse/BEAM-2262// but I think makes sense in 
>> general for looping timers when// there is no need to trigger timers after 
>> the window is doneif (paneInfo.timing == PaneInfo.Timing.LATE) {
>>   receiver.output(element)
>>   return}
>>
>> At least all my unit tests are passing... is there any problem with this
>> approach?
>>
>> Thanks,
>> Raman
>>
>>
>> On Wed, Jan 13, 2021 at 9:42 AM Raman Gupta <rocketra...@gmail.com>
>> wrote:
>>
>>> (Replying to Reza) Yes, I am using TestStream for my unit test. Other
>>> replies below.
>>>
>>> On Wed, Jan 13, 2021 at 3:40 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>>> Hi,
>>>>
>>>> yes, there is a possible non-determinism, that is related to the
>>>> timestamp combiner. Timestamp combiners combine only elements, that are not
>>>> 'late' ([1]), meaning that their timestamp is not preceding output
>>>> watermark of the GBK. Looking at the pipeline code I suppose that could be
>>>> the cause.
>>>>
>>>
>>> Yes, the test stream in this test case does indeed send the element in
>>> question "late". Here is the setup:
>>>
>>> val base = Instant.EPOCH + 6.hours
>>> val xStream: TestStream<X> = TestStream.create(coder)
>>>   .addElements(x["1"]) // this just initializes the looping timer
>>>   // advance watermark past end of window that would normally process x2
>>>   .advanceWatermarkTo((base + 3.hours + 1.minutes).asJoda())
>>>   .addElements(x["2"]) // now we see the element
>>>   .advanceWatermarkToInfinity()
>>>
>>> Here late element x["2"] has a timestamp of 1970-01-01T07:30:00.000Z and
>>> the watermark at the time x["2"] is added is 1970-01-01T09:00:01.000Z.
>>>
>>> So I get your point that the timestamp combiner is not used for late
>>> elements, but if late elements are singly emitted as in this pipeline, why
>>> do any timestamp modification at all? I would expect them to arrive with
>>> their original timestamp, not one changed from 1970-01-01T07:30:00.000Z
>>> to 1970-01-01T07:34:59.999Z (this is the part that seems
>>> non-deterministic). What is the logic / reason behind the pipeline setting
>>> this element's timestamp to 1970-01-01T07:34:59.999Z?
>>>
>>>
>>>> You can make the pipeline deterministic by using
>>>> TimestampCombiner.END_OF_WINDOW (default).
>>>>
>>>
>>> It's definitely not ideal for this use case, but I'll consider it.
>>>
>>>
>>>> If you *need* to use the TimestampCombiner.EARLIEST, you can probably
>>>> do that by tweaking the looping timer stateful dofn and fix timestamps
>>>> there (using timer output timestamp).
>>>>
>>>
>>> I had already tried that but the pipeline throws an error that the
>>> timestamp emitted cannot be earlier than the current element timestamp.
>>>
>>> Thanks,
>>> Raman
>>>
>>>
>>>
>>>>   Jan
>>>>
>>>> [1] https://issues.apache.org/jira/browse/BEAM-2262
>>>> On 1/12/21 5:26 PM, Raman Gupta wrote:
>>>>
>>>> Your reply made me realize I removed the condition from my local copy
>>>> of the looping timer that brings the timer forward if it encounters an
>>>> earlier element later in the stream:
>>>>
>>>> if (currentTimerValue == null || currentTimerValue >    
>>>> nextTimerTimeBasedOnCurrentElement.getMillis()) {
>>>>
>>>>
>>>> Restoring that condition fixes that issue.
>>>>
>>>> However, the reason I removed that condition in the first place was
>>>> because it was making a unit test non-deterministic -- sometimes the
>>>> element timestamps into the looping timer didn't seem to match the element
>>>> timestamps according to the EARLIEST timestamp combiner defined, causing
>>>> the timer to execute an additional time.
>>>>
>>>> The pipeline:
>>>>
>>>> input
>>>>   // withAllowedTimestampSkew is deprecated, but as of now, there is no 
>>>> replacement  // https://issues.apache.org/jira/browse/BEAM-644  
>>>> .apply("XTimestamps", WithTimestamps
>>>>     .of<X> { it.enteredAt.asJoda() }    
>>>> .withAllowedTimestampSkew(Duration.INFINITE.asJoda())
>>>>   )
>>>>   .apply("FixedTickWindows",    
>>>> Window.into<X>(FixedWindows.of(5.minutes.asJoda()))
>>>>       .triggering(
>>>>         AfterWatermark.pastEndOfWindow()
>>>>           .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
>>>>           .withLateFirings(AfterPane.elementCountAtLeast(1))
>>>>       )
>>>>       .withAllowedLateness(3.days.asJoda(), 
>>>> Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
>>>>       .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_ALWAYS)
>>>>       .discardingFiredPanes()
>>>>       .withTimestampCombiner(TimestampCombiner.EARLIEST)
>>>>   )
>>>>   .apply("KeyByUser", WithKeys.of { it.userId })
>>>>   .apply("GroupByUser", GroupByKey.create())
>>>>   .apply("GlobalWindowsLoopingStatefulTimer",    Window.into<KV<String, 
>>>> Iterable<X>>>(GlobalWindows())
>>>>       
>>>> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
>>>>       .discardingFiredPanes()
>>>>       .withTimestampCombiner(TimestampCombiner.EARLIEST)
>>>>   )
>>>>   .apply("LoopingStatefulTimer",    
>>>> ParDo.of(LoopingStatefulTimer(5.minutes, (options.timerTimeoutDays ?: 
>>>> 30).days)))
>>>>
>>>>
>>>> The looping timer receives an @Timestamp value in the process function
>>>> of:
>>>>
>>>> 1970-01-01T07:34:59.999Z
>>>>
>>>> but the earliest timestamp of the (single) element in the elements
>>>> iterable is:
>>>>
>>>> 1970-01-01T07:30:00.000Z
>>>>
>>>> I would have thought given my timestamp combiners on my windows that
>>>> the timestamp should have been 07:30:00.000Z. Is there something wrong in
>>>> my pipeline that is causing this non-deterministic behavior?
>>>>
>>>> Thanks,
>>>> Raman
>>>>
>>>> On Tue, Jan 12, 2021 at 9:47 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>>
>>>>> Hi Raman,
>>>>>
>>>>> can you share the details of the pipeline? How exactly are you using
>>>>> the
>>>>> looping timer? Timer as described in the linked blog post should be
>>>>> deterministic even when the order of the input elements is undefined.
>>>>> Does you logic depend on element ordering?
>>>>>
>>>>>   Jan
>>>>>
>>>>> On 1/12/21 3:18 PM, Raman Gupta wrote:
>>>>> > Hello, I am building and testing a pipeline with the direct runner.
>>>>> > The pipeline includes a looping timer -
>>>>> > https://beam.apache.org/blog/looping-timers/.
>>>>> >
>>>>> > For now, I am using JdbcIO to obtain my input data, though when put
>>>>> > into production the pipeline will use PubSubIO.
>>>>> >
>>>>> > I am finding that the looping timer begins producing outputs at a
>>>>> > random event time, which makes some sense given the randomization of
>>>>> > inputs in the direct runner. However, this makes the results of
>>>>> > executing my pipeline with the direct runner completely
>>>>> non-deterministic.
>>>>> >
>>>>> > So:
>>>>> >
>>>>> > 1) Is there a way to turn off this non-deterministic behavior, but
>>>>> > just for the GlobalWindow / LoopingTimer?
>>>>> >
>>>>> > 2) Perhaps alternatively, is there a way to "initialize" the looping
>>>>> > timer when the pipeline starts, rather than when it first sees an
>>>>> > element? Perhaps a side input?
>>>>> >
>>>>> > 3) Am I right in assuming that when I move this pipeline to pub/sub
>>>>> io
>>>>> > and operate it in streaming mode, this issue will go away?
>>>>> >
>>>>> > Thanks!
>>>>> > Raman
>>>>> >
>>>>>
>>>>

Reply via email to