Hmm, I think I've found a simple solution... adding this to the beginning of my looping timer @ProcessElement function:
// late elements don't need to affect our looping timer, // pass them through without modification // this is kind of a work-around for https://issues.apache.org/jira/browse/BEAM-2262 // but I think makes sense in general for looping timers when // there is no need to trigger timers after the window is done if (paneInfo.timing == PaneInfo.Timing.LATE) { receiver.output(element) return } At least all my unit tests are passing... is there any problem with this approach? Thanks, Raman On Wed, Jan 13, 2021 at 9:42 AM Raman Gupta <rocketra...@gmail.com> wrote: > (Replying to Reza) Yes, I am using TestStream for my unit test. Other > replies below. > > On Wed, Jan 13, 2021 at 3:40 AM Jan Lukavský <je...@seznam.cz> wrote: > >> Hi, >> >> yes, there is a possible non-determinism, that is related to the >> timestamp combiner. Timestamp combiners combine only elements, that are not >> 'late' ([1]), meaning that their timestamp is not preceding output >> watermark of the GBK. Looking at the pipeline code I suppose that could be >> the cause. >> > > Yes, the test stream in this test case does indeed send the element in > question "late". Here is the setup: > > val base = Instant.EPOCH + 6.hours > val xStream: TestStream<X> = TestStream.create(coder) > .addElements(x["1"]) // this just initializes the looping timer > // advance watermark past end of window that would normally process x2 > .advanceWatermarkTo((base + 3.hours + 1.minutes).asJoda()) > .addElements(x["2"]) // now we see the element > .advanceWatermarkToInfinity() > > Here late element x["2"] has a timestamp of 1970-01-01T07:30:00.000Z and > the watermark at the time x["2"] is added is 1970-01-01T09:00:01.000Z. > > So I get your point that the timestamp combiner is not used for late > elements, but if late elements are singly emitted as in this pipeline, why > do any timestamp modification at all? I would expect them to arrive with > their original timestamp, not one changed from 1970-01-01T07:30:00.000Z > to 1970-01-01T07:34:59.999Z (this is the part that seems > non-deterministic). What is the logic / reason behind the pipeline setting > this element's timestamp to 1970-01-01T07:34:59.999Z? > > >> You can make the pipeline deterministic by using >> TimestampCombiner.END_OF_WINDOW (default). >> > > It's definitely not ideal for this use case, but I'll consider it. > > >> If you *need* to use the TimestampCombiner.EARLIEST, you can probably do >> that by tweaking the looping timer stateful dofn and fix timestamps there >> (using timer output timestamp). >> > > I had already tried that but the pipeline throws an error that the > timestamp emitted cannot be earlier than the current element timestamp. > > Thanks, > Raman > > > >> Jan >> >> [1] https://issues.apache.org/jira/browse/BEAM-2262 >> On 1/12/21 5:26 PM, Raman Gupta wrote: >> >> Your reply made me realize I removed the condition from my local copy of >> the looping timer that brings the timer forward if it encounters an earlier >> element later in the stream: >> >> if (currentTimerValue == null || currentTimerValue > >> nextTimerTimeBasedOnCurrentElement.getMillis()) { >> >> >> Restoring that condition fixes that issue. >> >> However, the reason I removed that condition in the first place was >> because it was making a unit test non-deterministic -- sometimes the >> element timestamps into the looping timer didn't seem to match the element >> timestamps according to the EARLIEST timestamp combiner defined, causing >> the timer to execute an additional time. >> >> The pipeline: >> >> input >> // withAllowedTimestampSkew is deprecated, but as of now, there is no >> replacement // https://issues.apache.org/jira/browse/BEAM-644 >> .apply("XTimestamps", WithTimestamps >> .of<X> { it.enteredAt.asJoda() } >> .withAllowedTimestampSkew(Duration.INFINITE.asJoda()) >> ) >> .apply("FixedTickWindows", >> Window.into<X>(FixedWindows.of(5.minutes.asJoda())) >> .triggering( >> AfterWatermark.pastEndOfWindow() >> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()) >> .withLateFirings(AfterPane.elementCountAtLeast(1)) >> ) >> .withAllowedLateness(3.days.asJoda(), >> Window.ClosingBehavior.FIRE_IF_NON_EMPTY) >> .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_ALWAYS) >> .discardingFiredPanes() >> .withTimestampCombiner(TimestampCombiner.EARLIEST) >> ) >> .apply("KeyByUser", WithKeys.of { it.userId }) >> .apply("GroupByUser", GroupByKey.create()) >> .apply("GlobalWindowsLoopingStatefulTimer", Window.into<KV<String, >> Iterable<X>>>(GlobalWindows()) >> >> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane())) >> .discardingFiredPanes() >> .withTimestampCombiner(TimestampCombiner.EARLIEST) >> ) >> .apply("LoopingStatefulTimer", ParDo.of(LoopingStatefulTimer(5.minutes, >> (options.timerTimeoutDays ?: 30).days))) >> >> >> The looping timer receives an @Timestamp value in the process function of: >> >> 1970-01-01T07:34:59.999Z >> >> but the earliest timestamp of the (single) element in the elements >> iterable is: >> >> 1970-01-01T07:30:00.000Z >> >> I would have thought given my timestamp combiners on my windows that the >> timestamp should have been 07:30:00.000Z. Is there something wrong in my >> pipeline that is causing this non-deterministic behavior? >> >> Thanks, >> Raman >> >> On Tue, Jan 12, 2021 at 9:47 AM Jan Lukavský <je...@seznam.cz> wrote: >> >>> Hi Raman, >>> >>> can you share the details of the pipeline? How exactly are you using the >>> looping timer? Timer as described in the linked blog post should be >>> deterministic even when the order of the input elements is undefined. >>> Does you logic depend on element ordering? >>> >>> Jan >>> >>> On 1/12/21 3:18 PM, Raman Gupta wrote: >>> > Hello, I am building and testing a pipeline with the direct runner. >>> > The pipeline includes a looping timer - >>> > https://beam.apache.org/blog/looping-timers/. >>> > >>> > For now, I am using JdbcIO to obtain my input data, though when put >>> > into production the pipeline will use PubSubIO. >>> > >>> > I am finding that the looping timer begins producing outputs at a >>> > random event time, which makes some sense given the randomization of >>> > inputs in the direct runner. However, this makes the results of >>> > executing my pipeline with the direct runner completely >>> non-deterministic. >>> > >>> > So: >>> > >>> > 1) Is there a way to turn off this non-deterministic behavior, but >>> > just for the GlobalWindow / LoopingTimer? >>> > >>> > 2) Perhaps alternatively, is there a way to "initialize" the looping >>> > timer when the pipeline starts, rather than when it first sees an >>> > element? Perhaps a side input? >>> > >>> > 3) Am I right in assuming that when I move this pipeline to pub/sub io >>> > and operate it in streaming mode, this issue will go away? >>> > >>> > Thanks! >>> > Raman >>> > >>> >>