Thanks guys for all the help. I think the late-passthrough solution should work for my use case.
On Wed, Jan 13, 2021 at 10:27 AM Reza Ardeshir Rokni <raro...@gmail.com> wrote: > Hi, > > Late data, in general, can be problematic for the looping timer pattern, > as well as other use cases, where the arrival of data in the @process > function creates a EventTime domain Timer. The solution you have, which > essentially passes it through, is a nice option, another solution would be > to deal with the late data in a branch upstream of the DoFn where timer > work is happening ( via a multi output). The former is good , if your > downstream transforms are not expecting specific behaviours from the > looping timer characteristics. The latter is often easier to use when you > want to write your downstream transforms without having to think about > dealing with late data. > > Cheers > > Reza > > On Wed, 13 Jan 2021 at 23:12, Jan Lukavský <je...@seznam.cz> wrote: > >> I think there still could be problems in some corner cases. The problem >> is, that elements considered 'late' in timestamp combiner have different >> definition than what is marked as late in PaneInfo. So you can have a >> corner case, when PaneInfo would on ON_TIME, but the timestamp would still >> be shifted to end of window. This would probably not be too often, but it >> can happen. If it is fine for your use case, then this could work. >> >> Jan >> On 1/13/21 3:59 PM, Raman Gupta wrote: >> >> Hmm, I think I've found a simple solution... adding this to the beginning >> of my looping timer @ProcessElement function: >> >> // late elements don't need to affect our looping timer,// pass them through >> without modification// this is kind of a work-around for >> https://issues.apache.org/jira/browse/BEAM-2262// but I think makes sense in >> general for looping timers when// there is no need to trigger timers after >> the window is doneif (paneInfo.timing == PaneInfo.Timing.LATE) { >> receiver.output(element) >> return} >> >> At least all my unit tests are passing... is there any problem with this >> approach? >> >> Thanks, >> Raman >> >> >> On Wed, Jan 13, 2021 at 9:42 AM Raman Gupta <rocketra...@gmail.com> >> wrote: >> >>> (Replying to Reza) Yes, I am using TestStream for my unit test. Other >>> replies below. >>> >>> On Wed, Jan 13, 2021 at 3:40 AM Jan Lukavský <je...@seznam.cz> wrote: >>> >>>> Hi, >>>> >>>> yes, there is a possible non-determinism, that is related to the >>>> timestamp combiner. Timestamp combiners combine only elements, that are not >>>> 'late' ([1]), meaning that their timestamp is not preceding output >>>> watermark of the GBK. Looking at the pipeline code I suppose that could be >>>> the cause. >>>> >>> >>> Yes, the test stream in this test case does indeed send the element in >>> question "late". Here is the setup: >>> >>> val base = Instant.EPOCH + 6.hours >>> val xStream: TestStream<X> = TestStream.create(coder) >>> .addElements(x["1"]) // this just initializes the looping timer >>> // advance watermark past end of window that would normally process x2 >>> .advanceWatermarkTo((base + 3.hours + 1.minutes).asJoda()) >>> .addElements(x["2"]) // now we see the element >>> .advanceWatermarkToInfinity() >>> >>> Here late element x["2"] has a timestamp of 1970-01-01T07:30:00.000Z and >>> the watermark at the time x["2"] is added is 1970-01-01T09:00:01.000Z. >>> >>> So I get your point that the timestamp combiner is not used for late >>> elements, but if late elements are singly emitted as in this pipeline, why >>> do any timestamp modification at all? I would expect them to arrive with >>> their original timestamp, not one changed from 1970-01-01T07:30:00.000Z >>> to 1970-01-01T07:34:59.999Z (this is the part that seems >>> non-deterministic). What is the logic / reason behind the pipeline setting >>> this element's timestamp to 1970-01-01T07:34:59.999Z? >>> >>> >>>> You can make the pipeline deterministic by using >>>> TimestampCombiner.END_OF_WINDOW (default). >>>> >>> >>> It's definitely not ideal for this use case, but I'll consider it. >>> >>> >>>> If you *need* to use the TimestampCombiner.EARLIEST, you can probably >>>> do that by tweaking the looping timer stateful dofn and fix timestamps >>>> there (using timer output timestamp). >>>> >>> >>> I had already tried that but the pipeline throws an error that the >>> timestamp emitted cannot be earlier than the current element timestamp. >>> >>> Thanks, >>> Raman >>> >>> >>> >>>> Jan >>>> >>>> [1] https://issues.apache.org/jira/browse/BEAM-2262 >>>> On 1/12/21 5:26 PM, Raman Gupta wrote: >>>> >>>> Your reply made me realize I removed the condition from my local copy >>>> of the looping timer that brings the timer forward if it encounters an >>>> earlier element later in the stream: >>>> >>>> if (currentTimerValue == null || currentTimerValue > >>>> nextTimerTimeBasedOnCurrentElement.getMillis()) { >>>> >>>> >>>> Restoring that condition fixes that issue. >>>> >>>> However, the reason I removed that condition in the first place was >>>> because it was making a unit test non-deterministic -- sometimes the >>>> element timestamps into the looping timer didn't seem to match the element >>>> timestamps according to the EARLIEST timestamp combiner defined, causing >>>> the timer to execute an additional time. >>>> >>>> The pipeline: >>>> >>>> input >>>> // withAllowedTimestampSkew is deprecated, but as of now, there is no >>>> replacement // https://issues.apache.org/jira/browse/BEAM-644 >>>> .apply("XTimestamps", WithTimestamps >>>> .of<X> { it.enteredAt.asJoda() } >>>> .withAllowedTimestampSkew(Duration.INFINITE.asJoda()) >>>> ) >>>> .apply("FixedTickWindows", >>>> Window.into<X>(FixedWindows.of(5.minutes.asJoda())) >>>> .triggering( >>>> AfterWatermark.pastEndOfWindow() >>>> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()) >>>> .withLateFirings(AfterPane.elementCountAtLeast(1)) >>>> ) >>>> .withAllowedLateness(3.days.asJoda(), >>>> Window.ClosingBehavior.FIRE_IF_NON_EMPTY) >>>> .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_ALWAYS) >>>> .discardingFiredPanes() >>>> .withTimestampCombiner(TimestampCombiner.EARLIEST) >>>> ) >>>> .apply("KeyByUser", WithKeys.of { it.userId }) >>>> .apply("GroupByUser", GroupByKey.create()) >>>> .apply("GlobalWindowsLoopingStatefulTimer", Window.into<KV<String, >>>> Iterable<X>>>(GlobalWindows()) >>>> >>>> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane())) >>>> .discardingFiredPanes() >>>> .withTimestampCombiner(TimestampCombiner.EARLIEST) >>>> ) >>>> .apply("LoopingStatefulTimer", >>>> ParDo.of(LoopingStatefulTimer(5.minutes, (options.timerTimeoutDays ?: >>>> 30).days))) >>>> >>>> >>>> The looping timer receives an @Timestamp value in the process function >>>> of: >>>> >>>> 1970-01-01T07:34:59.999Z >>>> >>>> but the earliest timestamp of the (single) element in the elements >>>> iterable is: >>>> >>>> 1970-01-01T07:30:00.000Z >>>> >>>> I would have thought given my timestamp combiners on my windows that >>>> the timestamp should have been 07:30:00.000Z. Is there something wrong in >>>> my pipeline that is causing this non-deterministic behavior? >>>> >>>> Thanks, >>>> Raman >>>> >>>> On Tue, Jan 12, 2021 at 9:47 AM Jan Lukavský <je...@seznam.cz> wrote: >>>> >>>>> Hi Raman, >>>>> >>>>> can you share the details of the pipeline? How exactly are you using >>>>> the >>>>> looping timer? Timer as described in the linked blog post should be >>>>> deterministic even when the order of the input elements is undefined. >>>>> Does you logic depend on element ordering? >>>>> >>>>> Jan >>>>> >>>>> On 1/12/21 3:18 PM, Raman Gupta wrote: >>>>> > Hello, I am building and testing a pipeline with the direct runner. >>>>> > The pipeline includes a looping timer - >>>>> > https://beam.apache.org/blog/looping-timers/. >>>>> > >>>>> > For now, I am using JdbcIO to obtain my input data, though when put >>>>> > into production the pipeline will use PubSubIO. >>>>> > >>>>> > I am finding that the looping timer begins producing outputs at a >>>>> > random event time, which makes some sense given the randomization of >>>>> > inputs in the direct runner. However, this makes the results of >>>>> > executing my pipeline with the direct runner completely >>>>> non-deterministic. >>>>> > >>>>> > So: >>>>> > >>>>> > 1) Is there a way to turn off this non-deterministic behavior, but >>>>> > just for the GlobalWindow / LoopingTimer? >>>>> > >>>>> > 2) Perhaps alternatively, is there a way to "initialize" the looping >>>>> > timer when the pipeline starts, rather than when it first sees an >>>>> > element? Perhaps a side input? >>>>> > >>>>> > 3) Am I right in assuming that when I move this pipeline to pub/sub >>>>> io >>>>> > and operate it in streaming mode, this issue will go away? >>>>> > >>>>> > Thanks! >>>>> > Raman >>>>> > >>>>> >>>>