> What is the logic / reason behind the pipeline setting this element's timestamp to 1970-01-01T07:34:59.999Z?

There are reasons (which I cannot recall right now :)) why late elements should not be simply added to the combiner. If there are only late elements in pane, the combiner actually gets no elements and shifts the timestamp to end of window (which is why TimestampCombiner.END_OF_WINDOW works well, because it does that for all elements, regardless if late or not).

> I had already tried that but the pipeline throws an error that the timestamp emitted cannot be earlier than the current element timestamp.

Ah, right. The reason for that is you cannot set output timestamp that would precede current output watermark (which is unknown to user code). In that case, the solution could be to replace the GBK (and triggers and timestamp combiners) with custom stateful ParDo, that could control the output timestamp (it can set timer output timestamp to current element's timestamp, when this would be the current minimum).

Jan

On 1/13/21 3:42 PM, Raman Gupta wrote:
(Replying to Reza) Yes, I am using TestStream for my unit test. Other replies below.

On Wed, Jan 13, 2021 at 3:40 AM Jan Lukavský <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

    Hi,

    yes, there is a possible non-determinism, that is related to the
    timestamp combiner. Timestamp combiners combine only elements,
    that are not 'late' ([1]), meaning that their timestamp is not
    preceding output watermark of the GBK. Looking at the pipeline
    code I suppose that could be the cause.


Yes, the test stream in this test case does indeed send the element in question "late". Here is the setup:

val base = Instant.EPOCH + 6.hours
val xStream: TestStream<X> = TestStream.create(coder)
  .addElements(x["1"]) // this just initializes the looping timer
  // advance watermark past end of window that would normally process x2
  .advanceWatermarkTo((base + 3.hours + 1.minutes).asJoda())
  .addElements(x["2"]) // now we see the element
  .advanceWatermarkToInfinity()

Here late element x["2"] has a timestamp of 1970-01-01T07:30:00.000Z and the watermark at the time x["2"] is added is 1970-01-01T09:00:01.000Z.

So I get your point that the timestamp combiner is not used for late elements, but if late elements are singly emitted as in this pipeline, why do any timestamp modification at all? I would expect them to arrive with their original timestamp, not one changed from 1970-01-01T07:30:00.000Z to 1970-01-01T07:34:59.999Z (this is the part that seems non-deterministic). What is the logic / reason behind the pipeline setting this element's timestamp to 1970-01-01T07:34:59.999Z?

    You can make the pipeline deterministic by using
    TimestampCombiner.END_OF_WINDOW (default).


It's definitely not ideal for this use case, but I'll consider it.

    If you *need* to use the TimestampCombiner.EARLIEST, you can
    probably do that by tweaking the looping timer stateful dofn and
    fix timestamps there (using timer output timestamp).


I had already tried that but the pipeline throws an error that the timestamp emitted cannot be earlier than the current element timestamp.

Thanks,
Raman

      Jan

    [1] https://issues.apache.org/jira/browse/BEAM-2262

    On 1/12/21 5:26 PM, Raman Gupta wrote:
    Your reply made me realize I removed the condition from my local
    copy of the looping timer that brings the timer forward if it
    encounters an earlier element later in the stream:

    |if (currentTimerValue == null ||
    currentTimerValue**>**nextTimerTimeBasedOnCurrentElement.getMillis())
    {|

    Restoring that condition fixes that issue.

    However, the reason I removed that condition in the first place
    was because it was making a unit test non-deterministic --
    sometimes the element timestamps into the looping timer didn't
    seem to match the element timestamps according to the EARLIEST
    timestamp combiner defined, causing the timer to execute an
    additional time.

    The pipeline:

    input
       // withAllowedTimestampSkew is deprecated, but as of now, there
    is no replacement // https://issues.apache.org/jira/browse/BEAM-644 
.apply("XTimestamps", WithTimestamps
         .of<X>{ it.enteredAt.asJoda()} 
.withAllowedTimestampSkew(Duration.INFINITE.asJoda())
       )
       .apply("FixedTickWindows", 
Window.into<X>(FixedWindows.of(5.minutes.asJoda()))
           .triggering(
             AfterWatermark.pastEndOfWindow()
               .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
               .withLateFirings(AfterPane.elementCountAtLeast(1))
           )
           .withAllowedLateness(3.days.asJoda(), 
Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
           .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_ALWAYS)
           .discardingFiredPanes()
           .withTimestampCombiner(TimestampCombiner.EARLIEST)
       )
       .apply("KeyByUser", WithKeys.of{ it.userId })
       .apply("GroupByUser", GroupByKey.create())
       .apply("GlobalWindowsLoopingStatefulTimer", Window.into<KV<String, 
Iterable<X>>>(GlobalWindows())
           
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
           .discardingFiredPanes()
           .withTimestampCombiner(TimestampCombiner.EARLIEST)
       )
       .apply("LoopingStatefulTimer", ParDo.of(LoopingStatefulTimer(5.minutes, 
(options.timerTimeoutDays ?:30).days)))

    The looping timer receives an @Timestamp value in the process
    function of:

    1970-01-01T07:34:59.999Z

    but the earliest timestamp of the (single) element in the
    elements iterable is:

    1970-01-01T07:30:00.000Z

    I would have thought given my timestamp combiners on my windows
    that the timestamp should have been 07:30:00.000Z. Is there
    something wrong in my pipeline that is causing this
    non-deterministic behavior?

    Thanks,
    Raman

    On Tue, Jan 12, 2021 at 9:47 AM Jan Lukavský <je...@seznam.cz
    <mailto:je...@seznam.cz>> wrote:

        Hi Raman,

        can you share the details of the pipeline? How exactly are
        you using the
        looping timer? Timer as described in the linked blog post
        should be
        deterministic even when the order of the input elements is
        undefined.
        Does you logic depend on element ordering?

          Jan

        On 1/12/21 3:18 PM, Raman Gupta wrote:
        > Hello, I am building and testing a pipeline with the direct
        runner.
        > The pipeline includes a looping timer -
        > https://beam.apache.org/blog/looping-timers/.
        >
        > For now, I am using JdbcIO to obtain my input data, though
        when put
        > into production the pipeline will use PubSubIO.
        >
        > I am finding that the looping timer begins producing
        outputs at a
        > random event time, which makes some sense given the
        randomization of
        > inputs in the direct runner. However, this makes the
        results of
        > executing my pipeline with the direct runner completely
        non-deterministic.
        >
        > So:
        >
        > 1) Is there a way to turn off this non-deterministic
        behavior, but
        > just for the GlobalWindow / LoopingTimer?
        >
        > 2) Perhaps alternatively, is there a way to "initialize"
        the looping
        > timer when the pipeline starts, rather than when it first
        sees an
        > element? Perhaps a side input?
        >
        > 3) Am I right in assuming that when I move this pipeline to
        pub/sub io
        > and operate it in streaming mode, this issue will go away?
        >
        > Thanks!
        > Raman
        >

Reply via email to