On 3/28/22 20:17, Reuven Lax wrote:


On Mon, Mar 28, 2022 at 11:08 AM Robert Bradshaw <rober...@google.com> wrote:

    On Mon, Mar 28, 2022 at 11:04 AM Reuven Lax <re...@google.com> wrote:
    >
    > On Mon, Mar 28, 2022 at 10:59 AM Evan Galpin
    <evan.gal...@gmail.com> wrote:
    >>
    >> I don't believe that the issue is Flink specific but rather
    that Flink is one example of many potential examples.  Enforcing
    that watermark updates can only happen at bundle boundaries would
    ensure that any data buffered while processing a single bundle in
    a DoFn could be output ON_TIME, especially without any need for a
    TimerSpec to explicitly hold the watermark for that purpose.  This
    is in reference to data buffered within a single bundle, and not
    cross-bundle buffering such as in the case of GroupIntoBatches.
    >
    >
    > Any in-flight data (i.e. data being processed that is not yet
    committed back to the runner) must hold up the output watermark.
    Since in the Beam model all records in a bundle are somewhat
    atomic (e.g. if the bundle succeeds, none of of them should be
    replayed in a proper exactly-once runner), I think this implicitly
    means that any elements in an in-flight bundle must hold up the
    watermark. This doesn't mean that the watermark can't advance
    while the bundle is in flight -just that it can't advance past any
    of the timestamps outstanding in the bundle.

    Yes. The difficulty is that we don't have much visibility into
    "timestamps outstanding in the bundle" so we have to take
    min(timestamps of input elements in the bundle) which is not that
    different from only having watermark updates at bundle boundaries.


Exactly.

Agree, this works exactly the same. The requirement is not to not update the watermark, but not to update it past any on-time element in the bundle. Not updating the watermark at all is one solution, computing min(timestamps in bundle) works the same. Unfortunately, Flink does not construct bundles in advance, it is more an ad-hoc concept. Therefore the only way to hold the watermark is not to update it, because the timestamps of elements that will be part of the bundle are not known.

Two more questions:

 a) it seems that we are missing some @ValidatesRunner tests for this, right?

 b) should we relax the restriction of not allowing outputWithTimestamp() output element before the current element? I think it should be "before lowest element in the current bundle" or "before output watermark, if not already late, or not droppable if late (uh, this gets a little complicated :))". Not allowing outputting element with timestamp lower than the current element seems to be just a "safety-first" solution to the problem discussed here and is too restrictive. It could be worked-around using getAllowedTimestampSkew(), but that can cause errors.




    >> Take for example a PCollection with 1 second Fixed windowing. 
    The PCollection holds payload bodies for an external API to which
    requests will be made.  A hypothetical runner creates a bundle
    with Element A and Element B where Element A belongs to the window
    [0:00:01, 0:00:02) and Element B belongs to the window [0:00:02,
    0:00:03).  Assume that the DoFn is going to buffer all elements in
    a bundle so as to generate fewer round-trip requests to the
    external API, and then output the corresponding responses. The
    following is a high-level order of events that could result in
    data being labelled as LATE:
    >>
    >> 1. Watermark is 0:00:00
    >> 2. DoFn receives the bundle containing both Element A and Element B
    >> 3. Element A is processed by the DoFn, buffering in-memory and
    returning/completing
    >> 4. Watermark is (maybe) updated after having processed the
    element; let's assume in this example it is in fact updated to 0:00:02
    >> 5. Element B (from the same bundle) is processed by the DoFn
    >> 6. It's the end of the bundle, so now the in-memory buffered
    entities are used to make a request to external API
    >> 7. The API responses are gathered and intended to be output to
    the same window from which the corresponding element with request
    data originated (Element A and Element B carried this data)
    >> 8. The response data associated with the request payload found
    in Element A is output with the timestamp of Element A i.e.
    something in the range of Element A's window [0:00:01, 0:00:02)
    >> 9. The data in the prior step is considered LATE, strictly as a
    result of updating the watermark to 0:00:02 in Step 4 above
    >>
    >> If Step 4 was moved to be the last step in the process (i.e. at
    the bundle boundary) this issue would be avoided.  I would also
    argue that updating the watermark only after receiving a response
    for an input Element is a more accurate depiction of having
    completed processing for the element.  All that said, I could buy
    the argument that the above description might represent an
    anti-pattern of sorts where response data should actually be
    output with a timestamp corresponding to its receipt rather than
    the timestamp of its corresponding input element carrying the
    request body.
    >>
    >> - Evan
    >>
    >> On Mon, Mar 28, 2022 at 11:07 AM Reuven Lax <re...@google.com>
    wrote:
    >>>
    >>> I agree with you that changing on-time elements to late
    elements is incorrect, however I don't quite understand why doing
    things on bundle boundaries helps. Is this specific to Flink?
    >>>
    >>>
    >>>
    >>> On Mon, Mar 28, 2022 at 1:07 AM Jan Lukavský <je...@seznam.cz>
    wrote:
    >>>>
    >>>> Hi Robert,
    >>>>
    >>>> I had the same impression that holding the watermark between
    bundles is
    >>>> actually not part of the computational model. Now the
    question is -
    >>>> should it be?
    >>>>
    >>>> As you said, buffering and emitting in-memory buffered data means
    >>>> possibly outputting data that arrived as ON_TIME, but is
    outputted as
    >>>> LATE (or droppable, which is even worse). My understanding is
    that this
    >>>> is why there is the (deprecated) getAllowedTimestampSkew()
    method of
    >>>> DoFn, but that only bypasses the check, does not solve the
    issue (which
    >>>> is why it is deprecated, I suppose). I strongly believe that
    outputting
    >>>> elements that switch from ON_TIME to LATE is a correctness
    bug, because
    >>>> it has the potential to violate causality (which is strongly
    >>>> counter-intuitive in our universe :)). For some pipelines it can
    >>>> definitely cause incorrect outputs.
    >>>>
    >>>> If we could ensure the output watermark gets updated only between
    >>>> @FinishBundle and @StartBundle call then this problem would
    go away. I
    >>>> looked into the code of FlinkRunner and it seems to me that
    we could
    >>>> quite easily ensure this by not outputting watermark when a
    bundle is
    >>>> open and output it once it finishes. I didn't dig into that
    too deep, so
    >>>> I don't know if there would be any caveats, the question is
    apparently,
    >>>> if we could make these guarantees for other runners as well
    and if we
    >>>> could sensibly create a @ValidatesRunner test.
    >>>>
    >>>> WDYT?
    >>>>
    >>>>   Jan
    >>>>
    >>>> On 3/25/22 23:06, Robert Bradshaw wrote:
    >>>> > I do not think there is a hard and fast rule about updating
    watermarks
    >>>> > only at bundle boundaries. This seems perfectly legal for a
    pure 1:1
    >>>> > mapping DoFn. The issue is that DoFns are allowed to buffer
    data and
    >>>> > emit them in a later process (or finishBundle). If the
    watermark has
    >>>> > moved on, that may result in late data. We don't really
    have a way for
    >>>> > a DoFn to declare *it's* output watermark (i.e. "I promise
    not to emit
    >>>> > any data before this timestamp.")
    >>>> >
    >>>> > On Thu, Mar 24, 2022 at 8:10 AM Evan Galpin
    <egal...@apache.org> wrote:
    >>>> >> Thanks for starting this thread Jan, I'm keen to hear
    thoughts and outcomes!  I thought I would mention that answers to
    the questions posed here will help to unblock a 2.38.0 release
    blocker[1].
    >>>> >>
    >>>> >> [1] https://issues.apache.org/jira/browse/BEAM-14064
    >>>> >>
    >>>> >> On Thu, Mar 24, 2022 at 5:28 AM Jan Lukavský
    <je...@seznam.cz> wrote:
    >>>> >>> Hi,
    >>>> >>>
    >>>> >>> this is follow-up thread started from [1]. In the thread
    there is mentioned multiple times that (in stateless ParDo), the
    output watermark is allowed to advance only on bundle boundaries
    [2]. Essentially that would mean that anything in between calls to
    @StartBundle and @FinishBundle would be processed in single
    instant in (output) event-time. This makes perfect sense.
    >>>> >>>
    >>>> >>> The issue is that it seems that not all runners actually
    implement this behavior. FlinkRunner for instance does not have a
    "natural" concept of bundles and those are created in a more
    ad-hoc way to adhere with the DoFn life-cycle (see [3]). Watermark
    updates and elements are completely interleaved without any
    synchronization with bundle "open" or "close". If watermark
    updates are allowed to happen only on boundaries of bundles, then
    this seems to break this contract.
    >>>> >>>
    >>>> >>> The question therefore is - should we consider
    FlinkRunner as non-compliant with this aspect of the Apache Beam
    model or is this an "optional" part that runners are free to
    implement at will? In the case of the former, do we miss some
    @ValidatesRunner tests for this?
    >>>> >>>
    >>>> >>>   Jan
    >>>> >>>
    >>>> >>> [1]
    https://lists.apache.org/thread/mtwtno2o88lx3zl12jlz7o5w1lcgm2db
    >>>> >>>
    >>>> >>> [2]
    https://lists.apache.org/thread/7foy455spg43xo77zhrs62gc1m383t50
    >>>> >>>
    >>>> >>> [3]
    
https://github.com/apache/beam/blob/14862ccbdf2879574b6ce49149bdd7c9bf197322/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L786
    >>>> >>>
    >>>> >>>

Reply via email to