Yes, the question currently is if this is feasible. It definitely makes sense to enforce such behavior, but I'm not sure if I see all the corner cases. This is completely orthogonal to any runner-specific bundle settings (e.g. bundle size).

On 3/28/22 16:07, Evan Galpin wrote:
@Jan I'm +1 on the idea.  Just confirming that this would not negate the ability to buffer or otherwise make use of a settings like FlinkPipelineOptions#setMaxBundleSize[1], the change would imply simply refraining from outputting a watermark change until @FinishBundle is called (across all runners)?  Assuming this garners the required support, I'd be interested in collaborating/contributing if development could be parallelized for various runners.

[1] https://github.com/apache/beam/blob/14862ccbdf2879574b6ce49149bdd7c9bf197322/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java#L207-L211

- Evan

On Mon, Mar 28, 2022 at 4:07 AM Jan Lukavský <je...@seznam.cz> wrote:

    Hi Robert,

    I had the same impression that holding the watermark between
    bundles is
    actually not part of the computational model. Now the question is -
    should it be?

    As you said, buffering and emitting in-memory buffered data means
    possibly outputting data that arrived as ON_TIME, but is outputted as
    LATE (or droppable, which is even worse). My understanding is that
    this
    is why there is the (deprecated) getAllowedTimestampSkew() method of
    DoFn, but that only bypasses the check, does not solve the issue
    (which
    is why it is deprecated, I suppose). I strongly believe that
    outputting
    elements that switch from ON_TIME to LATE is a correctness bug,
    because
    it has the potential to violate causality (which is strongly
    counter-intuitive in our universe :)). For some pipelines it can
    definitely cause incorrect outputs.

    If we could ensure the output watermark gets updated only between
    @FinishBundle and @StartBundle call then this problem would go
    away. I
    looked into the code of FlinkRunner and it seems to me that we could
    quite easily ensure this by not outputting watermark when a bundle is
    open and output it once it finishes. I didn't dig into that too
    deep, so
    I don't know if there would be any caveats, the question is
    apparently,
    if we could make these guarantees for other runners as well and if we
    could sensibly create a @ValidatesRunner test.

    WDYT?

      Jan

    On 3/25/22 23:06, Robert Bradshaw wrote:
    > I do not think there is a hard and fast rule about updating
    watermarks
    > only at bundle boundaries. This seems perfectly legal for a pure 1:1
    > mapping DoFn. The issue is that DoFns are allowed to buffer data and
    > emit them in a later process (or finishBundle). If the watermark has
    > moved on, that may result in late data. We don't really have a
    way for
    > a DoFn to declare *it's* output watermark (i.e. "I promise not
    to emit
    > any data before this timestamp.")
    >
    > On Thu, Mar 24, 2022 at 8:10 AM Evan Galpin <egal...@apache.org>
    wrote:
    >> Thanks for starting this thread Jan, I'm keen to hear thoughts
    and outcomes!  I thought I would mention that answers to the
    questions posed here will help to unblock a 2.38.0 release blocker[1].
    >>
    >> [1] https://issues.apache.org/jira/browse/BEAM-14064
    >>
    >> On Thu, Mar 24, 2022 at 5:28 AM Jan Lukavský <je...@seznam.cz>
    wrote:
    >>> Hi,
    >>>
    >>> this is follow-up thread started from [1]. In the thread there
    is mentioned multiple times that (in stateless ParDo), the output
    watermark is allowed to advance only on bundle boundaries [2].
    Essentially that would mean that anything in between calls to
    @StartBundle and @FinishBundle would be processed in single
    instant in (output) event-time. This makes perfect sense.
    >>>
    >>> The issue is that it seems that not all runners actually
    implement this behavior. FlinkRunner for instance does not have a
    "natural" concept of bundles and those are created in a more
    ad-hoc way to adhere with the DoFn life-cycle (see [3]). Watermark
    updates and elements are completely interleaved without any
    synchronization with bundle "open" or "close". If watermark
    updates are allowed to happen only on boundaries of bundles, then
    this seems to break this contract.
    >>>
    >>> The question therefore is - should we consider FlinkRunner as
    non-compliant with this aspect of the Apache Beam model or is this
    an "optional" part that runners are free to implement at will? In
    the case of the former, do we miss some @ValidatesRunner tests for
    this?
    >>>
    >>>   Jan
    >>>
    >>> [1]
    https://lists.apache.org/thread/mtwtno2o88lx3zl12jlz7o5w1lcgm2db
    >>>
    >>> [2]
    https://lists.apache.org/thread/7foy455spg43xo77zhrs62gc1m383t50
    >>>
    >>> [3]
    
https://github.com/apache/beam/blob/14862ccbdf2879574b6ce49149bdd7c9bf197322/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L786
    >>>
    >>>

Reply via email to