Thanks guys for all the help. I think the late-passthrough solution should
work for my use case.
On Wed, Jan 13, 2021 at 10:27 AM Reza Ardeshir Rokni
wrote:
> Hi,
>
> Late data, in general, can be problematic for the looping timer pattern,
> as well as other use cases, where the arrival of data
Hi,
Late data, in general, can be problematic for the looping timer pattern, as
well as other use cases, where the arrival of data in the @process function
creates a EventTime domain Timer. The solution you have, which
essentially passes it through, is a nice option, another solution would be
to d
I think there still could be problems in some corner cases. The problem
is, that elements considered 'late' in timestamp combiner have different
definition than what is marked as late in PaneInfo. So you can have a
corner case, when PaneInfo would on ON_TIME, but the timestamp would
still be sh
> What is the logic / reason behind the pipeline setting this element's
timestamp to 1970-01-01T07:34:59.999Z?
There are reasons (which I cannot recall right now :)) why late elements
should not be simply added to the combiner. If there are only late
elements in pane, the combiner actually get
Hmm, I think I've found a simple solution... adding this to the beginning
of my looping timer @ProcessElement function:
// late elements don't need to affect our looping timer,
// pass them through without modification
// this is kind of a work-around for
https://issues.apache.org/jira/browse/BEAM
(Replying to Reza) Yes, I am using TestStream for my unit test. Other
replies below.
On Wed, Jan 13, 2021 at 3:40 AM Jan Lukavský wrote:
> Hi,
>
> yes, there is a possible non-determinism, that is related to the timestamp
> combiner. Timestamp combiners combine only elements, that are not 'late'
Hi,
yes, there is a possible non-determinism, that is related to the
timestamp combiner. Timestamp combiners combine only elements, that are
not 'late' ([1]), meaning that their timestamp is not preceding output
watermark of the GBK. Looking at the pipeline code I suppose that could
be the ca
Are you making use of TestStream for your Unit test?
On Wed, 13 Jan 2021 at 00:27, Raman Gupta wrote:
> Your reply made me realize I removed the condition from my local copy of
> the looping timer that brings the timer forward if it encounters an earlier
> element later in the stream:
>
> if (cu
Your reply made me realize I removed the condition from my local copy of
the looping timer that brings the timer forward if it encounters an earlier
element later in the stream:
if (currentTimerValue == null || currentTimerValue >
nextTimerTimeBasedOnCurrentElement.getMillis()) {
Restoring that
Hi Raman,
can you share the details of the pipeline? How exactly are you using the
looping timer? Timer as described in the linked blog post should be
deterministic even when the order of the input elements is undefined.
Does you logic depend on element ordering?
Jan
On 1/12/21 3:18 PM, Ra
10 matches
Mail list logo