Re: Looping timer, global windows, and direct runner

2021-01-13 Thread Raman Gupta
Thanks guys for all the help. I think the late-passthrough solution should work for my use case. On Wed, Jan 13, 2021 at 10:27 AM Reza Ardeshir Rokni wrote: > Hi, > > Late data, in general, can be problematic for the looping timer pattern, > as well as other use cases, where the arrival of data

Re: Looping timer, global windows, and direct runner

2021-01-13 Thread Reza Ardeshir Rokni
Hi, Late data, in general, can be problematic for the looping timer pattern, as well as other use cases, where the arrival of data in the @process function creates a EventTime domain Timer. The solution you have, which essentially passes it through, is a nice option, another solution would be to d

Re: Looping timer, global windows, and direct runner

2021-01-13 Thread Jan Lukavský
I think there still could be problems in some corner cases. The problem is, that elements considered 'late' in timestamp combiner have different definition than what is marked as late in PaneInfo. So you can have a corner case, when PaneInfo would on ON_TIME, but the timestamp would still be sh

Re: Looping timer, global windows, and direct runner

2021-01-13 Thread Jan Lukavský
> What is the logic / reason behind the pipeline setting this element's timestamp to 1970-01-01T07:34:59.999Z? There are reasons (which I cannot recall right now :)) why late elements should not be simply added to the combiner. If there are only late elements in pane, the combiner actually get

Re: Looping timer, global windows, and direct runner

2021-01-13 Thread Raman Gupta
Hmm, I think I've found a simple solution... adding this to the beginning of my looping timer @ProcessElement function: // late elements don't need to affect our looping timer, // pass them through without modification // this is kind of a work-around for https://issues.apache.org/jira/browse/BEAM

Re: Looping timer, global windows, and direct runner

2021-01-13 Thread Raman Gupta
(Replying to Reza) Yes, I am using TestStream for my unit test. Other replies below. On Wed, Jan 13, 2021 at 3:40 AM Jan Lukavský wrote: > Hi, > > yes, there is a possible non-determinism, that is related to the timestamp > combiner. Timestamp combiners combine only elements, that are not 'late'

Re: Looping timer, global windows, and direct runner

2021-01-13 Thread Jan Lukavský
Hi, yes, there is a possible non-determinism, that is related to the timestamp combiner. Timestamp combiners combine only elements, that are not 'late' ([1]), meaning that their timestamp is not preceding output watermark of the GBK. Looking at the pipeline code I suppose that could be the ca

Re: Looping timer, global windows, and direct runner

2021-01-12 Thread Reza Ardeshir Rokni
Are you making use of TestStream for your Unit test? On Wed, 13 Jan 2021 at 00:27, Raman Gupta wrote: > Your reply made me realize I removed the condition from my local copy of > the looping timer that brings the timer forward if it encounters an earlier > element later in the stream: > > if (cu

Re: Looping timer, global windows, and direct runner

2021-01-12 Thread Raman Gupta
Your reply made me realize I removed the condition from my local copy of the looping timer that brings the timer forward if it encounters an earlier element later in the stream: if (currentTimerValue == null || currentTimerValue > nextTimerTimeBasedOnCurrentElement.getMillis()) { Restoring that

Re: Looping timer, global windows, and direct runner

2021-01-12 Thread Jan Lukavský
Hi Raman, can you share the details of the pipeline? How exactly are you using the looping timer? Timer as described in the linked blog post should be deterministic even when the order of the input elements is undefined. Does you logic depend on element ordering?  Jan On 1/12/21 3:18 PM, Ra