Are you making use of TestStream for your Unit test? On Wed, 13 Jan 2021 at 00:27, Raman Gupta <rocketra...@gmail.com> wrote:
> Your reply made me realize I removed the condition from my local copy of > the looping timer that brings the timer forward if it encounters an earlier > element later in the stream: > > if (currentTimerValue == null || currentTimerValue > > nextTimerTimeBasedOnCurrentElement.getMillis()) { > > > Restoring that condition fixes that issue. > > However, the reason I removed that condition in the first place was > because it was making a unit test non-deterministic -- sometimes the > element timestamps into the looping timer didn't seem to match the element > timestamps according to the EARLIEST timestamp combiner defined, causing > the timer to execute an additional time. > > The pipeline: > > input > // withAllowedTimestampSkew is deprecated, but as of now, there is no > replacement > // https://issues.apache.org/jira/browse/BEAM-644 > .apply("XTimestamps", WithTimestamps > .of<X> { it.enteredAt.asJoda() } > .withAllowedTimestampSkew(Duration.INFINITE.asJoda()) > ) > .apply("FixedTickWindows", > Window.into<X>(FixedWindows.of(5.minutes.asJoda())) > .triggering( > AfterWatermark.pastEndOfWindow() > .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()) > .withLateFirings(AfterPane.elementCountAtLeast(1)) > ) > .withAllowedLateness(3.days.asJoda(), > Window.ClosingBehavior.FIRE_IF_NON_EMPTY) > .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_ALWAYS) > .discardingFiredPanes() > .withTimestampCombiner(TimestampCombiner.EARLIEST) > ) > .apply("KeyByUser", WithKeys.of { it.userId }) > .apply("GroupByUser", GroupByKey.create()) > .apply("GlobalWindowsLoopingStatefulTimer", > Window.into<KV<String, Iterable<X>>>(GlobalWindows()) > > .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane())) > .discardingFiredPanes() > .withTimestampCombiner(TimestampCombiner.EARLIEST) > ) > .apply("LoopingStatefulTimer", > ParDo.of(LoopingStatefulTimer(5.minutes, (options.timerTimeoutDays ?: > 30).days))) > > > The looping timer receives an @Timestamp value in the process function of: > > 1970-01-01T07:34:59.999Z > > but the earliest timestamp of the (single) element in the elements > iterable is: > > 1970-01-01T07:30:00.000Z > > I would have thought given my timestamp combiners on my windows that the > timestamp should have been 07:30:00.000Z. Is there something wrong in my > pipeline that is causing this non-deterministic behavior? > > Thanks, > Raman > > On Tue, Jan 12, 2021 at 9:47 AM Jan Lukavský <je...@seznam.cz> wrote: > >> Hi Raman, >> >> can you share the details of the pipeline? How exactly are you using the >> looping timer? Timer as described in the linked blog post should be >> deterministic even when the order of the input elements is undefined. >> Does you logic depend on element ordering? >> >> Jan >> >> On 1/12/21 3:18 PM, Raman Gupta wrote: >> > Hello, I am building and testing a pipeline with the direct runner. >> > The pipeline includes a looping timer - >> > https://beam.apache.org/blog/looping-timers/. >> > >> > For now, I am using JdbcIO to obtain my input data, though when put >> > into production the pipeline will use PubSubIO. >> > >> > I am finding that the looping timer begins producing outputs at a >> > random event time, which makes some sense given the randomization of >> > inputs in the direct runner. However, this makes the results of >> > executing my pipeline with the direct runner completely >> non-deterministic. >> > >> > So: >> > >> > 1) Is there a way to turn off this non-deterministic behavior, but >> > just for the GlobalWindow / LoopingTimer? >> > >> > 2) Perhaps alternatively, is there a way to "initialize" the looping >> > timer when the pipeline starts, rather than when it first sees an >> > element? Perhaps a side input? >> > >> > 3) Am I right in assuming that when I move this pipeline to pub/sub io >> > and operate it in streaming mode, this issue will go away? >> > >> > Thanks! >> > Raman >> > >> >