On 4/22/24 20:40, Kenneth Knowles wrote:

I'll go ahead and advertise https://s.apache.org/beam-sink-triggers again for this thread.
+1
There are a couple of difficult technical problems in there. One of 
them is backwards-propagating triggering to minimize extra latency. We 
can probably solve this as well as we solve forward-propagating 
without too much trouble. We also can/should leave it abstract so 
runners can implement either through back-propagating local triggering 
rules or using run-time communication to trigger upstream. These 
actually could interact well with stateful ParDo by sending a "trigger 
now please" message or some such.
Yes, this was what I was referring to as the more "functional" style for stateful ParDo. At minimum, it requires adding new callback independent of @ProcessElement and @OnTimer -  @OnTrigger?
But we also probably need retractions that automatically flow through 
the pipeline and update aggregations. Why? Because currently triggers 
don't just control update frequency but actually create new elements 
each time, so they require user fix-up logic to do the right thing 
with the output. When we go to higher levels of abstraction we need 
this to "just work" without changing the pipeline. There have been two 
(nearly identical) propotypes of adding retractions to the 
DirectRunner as proof of concept. But there's also work in all the IOs 
since they are not retraction-aware. Also lots of work in many library 
transforms where a retraction should be computed by running the 
transform "like normal" but then negating the result, but that cannot 
be the default for ParDo because it is deliberately more flexible, we 
just have to annotate Map and the like.
+1. I think retractions could be implemented as DSL on top of the current model. Retractions can be viewed as regular data elements with additional metadata (upsert, delete). For ParDo we could add something like @RetractElement (and define appropriate retraction functions to CombineFn and the like). We could introduce RetractedPCollection or similar for this purpose.
Getting all this right is a lot of work but would result in a system 
that is simpler to use out-of-the-box and a more robust SQL 
implementation (because you can't use triggers with SQL unless you 
have retractions or some other "just works" mode of computation). It 
would essentially change Beam into a delta-processing engine, which it 
arguably should be, with whole append-only elements being a simplest 
degenerate case of a delta (which would be highly optimized in 
batch/archival processing).
+1
Kenn

On Tue, Apr 16, 2024 at 2:36 AM Reuven Lax via dev <dev@beam.apache.org> wrote:
    Yes, but that's inevitable as stateful ParDo in a sense live
    outside of most of the window/trigger semantics. Basically a
    stateful ParDo is the user executing low-level control over these
    semantics, and controlling output frequency themselves with
    timers. One could however still propagate the trigger upstream of
    the stateful ParDo, though I'm not sure if that's the best approach.

    On Mon, Apr 15, 2024 at 11:31 PM Jan Lukavský <je...@seznam.cz> wrote:

        On 4/11/24 18:20, Reuven Lax via dev wrote:
        I'm not sure it would require all that. A "basic"
        implementation could be done on top of our existing model.
        Essentially the user would specify triggers at the sink
        ParDos, then the runner would walk backwards up the graph,
        reverse-propagating these triggers (with some resolution
        rules aimed at keeping the minimum trigger latency). The
        runner could under the covers simply just apply the
        appropriate trigger into the Window, using the current
        mechanism. Of course building this all into the framework
        from scratch would be cleaner, but we could also build this
        on top of what we have.
        Any propagation from sink to source would be blocked by any
        stateful ParDo, because that does not adhere to the concept of
        trigger, no? Hence, we could get the required downstream
        'cadence' of outputs, but these would change only when the
        upstream ParDo emits any data. Yes, one can argue that
        stateful ParDo is supposed to emit data at fast as possible,
        then this seems to work.
        On Thu, Apr 11, 2024 at 5:10 AM Jan Lukavský
        <je...@seznam.cz> wrote:

            I've probably heard about it, but I never read the
            proposal. Sounds great, but that would require to change
            our ParDos from the 'directive' style to something more
            functional, so that processing of elements, state updates
            and outputting results can be decoupled and managed by
            the runner independently. This goes exactly in the
            direction of unifying GBK and Combine with stateful
            ParDo. Sounds like something worth exploring for Beam 3. :)

            Anyway, thanks for this discussion, helped me clarify
            some more white spots.

             Jan

            On 4/10/24 19:24, Reuven Lax via dev wrote:
            Are you familiar with the "sink triggers" proposal?

            Essentially while windowing is usually a property of the
            data, and therefore flows downwards through the graph,
            triggering is usually a property of output (i.e. sink)
            latency - how much are you willing to wait to see data,
            and what semantics do you want for this early data.
            Ideally triggers should be specified separately at the
            ParDo level (Beam has no real notion of Sinks as a
            special object, so to allow for output specification it
            has to be on the ParDo), and the triggers should
            propagate up the graph back to the source. This is in
            contrast to today where we attach triggering to the
            windowing information.

            This was a proposal some years back and there was some
            effort made to implement it, but the implementation
            never really got off the ground.

            On Wed, Apr 10, 2024 at 12:43 AM Jan Lukavský
            <je...@seznam.cz> wrote:

                On 4/9/24 18:33, Kenneth Knowles wrote:
                At a top level `setWindowingStrategyInternal`
                exists to set up the metadata without actually
                assigning windows. If we were more clever we might
                have found a way for it to not be public... it is
                something that can easily lead to an invalid pipeline.
                Yes, that was what hit me about one minute after I
                started this thread. :)
                I think "compatible windows" today in Beam doesn't
                have very good uses anyhow. I do see how when you
                are flattening PCollections you might also want to
                explicitly have a function that says "and here is
                how to reconcile their different metadata". But is
                it not reasonable to use Window.into(global
                window)? It doesn't seem like boilerplate to me
                actually, but something you really want to know is
                happening.
                :)

                Of course this was the way out, but I was somewhat
                intuitively seeking something that could go this
                autonomously.

                Generally speaking, we might have some room for
                improvement in the way we handle windows and
                triggers - windows relate only to GBK and stateful
                ParDo, triggers relate to GBK only. They have no
                semantics if downstream processing does not use any
                of these. There could be a pipeline preprocessing
                stage that would discard (replace with meaningful
                defaults) any of these metadata that is unused, but
                can cause Pipeline to fail at construction time. It
                is also (to me) somewhat questionable if triggers
                are really a property of a PCollection or a property
                of a specific transform (GBK - ehm, actually
                (stateless) 'key by' + 'reduce by key', but that is
                completely different story :)) because (non-default)
                triggers are likely not preserved across multiple
                transforms. Maybe the correct subject of this thread
                could be "are we sure our windowing and triggering
                semantics is 100% correct"? Probably the - wrong -
                expectations at the beginning of this thread were
                due to conflict in my mental model of how things
                'could' work as opposed to how they actually work. :)

                 Jan

                Kenn

                On Tue, Apr 9, 2024 at 9:19 AM Jan Lukavský
                <je...@seznam.cz> wrote:

                    On 4/6/24 21:23, Reuven Lax via dev wrote:
                    So the problem here is that windowFn is a
                    property of the PCollection, not the element,
                    and the result of Flatten is a single PCollection.
                    Yes. That is the cause of why
                    Flatten.pCollections() needs the same windowFn.
                    In various cases, there is a notion of
                    "compatible" windows. Basically given window
                    functions W1 and W2, provide a W3 that "works"
                    with both.
                    Exactly this would be a nice feature for
                    Flatten, something like 'windowFn resolve
                    strategy', so that if use does not know the
                    windowFn of upstream PCollections this can be
                    somehow resolved at pipeline construction time.
                    Alternatively only as a small syntactic sugar,
                    something like:
                     
Flatten.pCollections().withWindowingStrategy(WindowResolution.into(oneInput.getWindowingStrategy()))

                    or anything similar. This can be done in user
                    code, so it is not something deeper, but might
                    help in some cases. It would be cool if we
                    could reuse concepts from other cases where
                    such mechanism is needed.

                    Note that Beam already has something similar
                    with side inputs, since the side input often
                    is in a different window than the main input.
                    However main input elements are supposed to
                    see side input elements in the same window
                    (and in fact main inputs are blocked until the
                    side-input window is ready), so we must do a
                    mapping. If for example (and very commonly!)
                    the side input is in the global window and the
                    main input is in a fixed window, by default we
                    will remap the global-window elements into the
                    main-input's fixed window.
                    This is a one-sided merge function, there is a
                    'main' and 'side' input, but the generic
                    symmetric merge might be possible as well. E.g.
                    if one PCollection of Flatten is in
                    GlobalWindow, I wonder if there are cases where
                    users would actually want to do anything else
                    then apply the same global windowing strategy
                    to all input PCollections.

                     Jan

                    In Side input we also allow the user to
                    control this mapping, so for example side
                    input elements could always map to the
                    previous fixed window (e.g. while processing
                    window 12-1, you want to see summary data of
                    all records in the previous window 11-12).
                    Users can do this by providing a
                    WindowMappingFunction to the View -
                    essentially a function from window to window.
                    Unfortunately this is hard to use (one must
                    create their own PCollectionView class) and
                    very poorly documented, so I doubt many users
                    know about this!

                    Reuven

                    On Sat, Apr 6, 2024 at 7:09 AM Jan Lukavský
                    <je...@seznam.cz> wrote:

                        Immediate self-correction, although
                        setting the strategy directly via
                        setWindowingStrategyInternal() *seemed* to
                        be working during Pipeline
                        construction time, during runtime it
                        obviously does not work, because
                        the PCollection was still windowed using
                        the old windowFn. Make sense to
                        me, but there remains the other question
                        if we can make flattening
                        PCollections with incompatible windowFns
                        more user-friendly. The current
                        approach where we require the same
                        windowFn for all input PCollections
                        creates some unnecessary boilerplate code
                        needed on user side.

                          Jan

                        On 4/6/24 15:45, Jan Lukavský wrote:
                        > Hi,
                        >
                        > I came across a case where using
                        >
                        PCollection#applyWindowingStrategyInternal
                        seems legit in user core.
                        > The case is roughly as follows:
                        >
                        >  a) compute some streaming statistics
                        >
                        >  b) apply the same transform (say
                        ComputeWindowedAggregation) with
                        > different parameters on these statistics
                        yielding two windowed
                        > PCollections - first is global with
                        early trigger, the other is
                        > sliding window, the specific parameters
                        of the windowFns are
                        > encapsulated in the
                        ComputeWindowedAggregation transform
                        >
                        >  c) apply the same transform on both of
                        the above PCollections,
                        > yielding two PCollections with the same
                        types, but different windowFns
                        >
                        >  d) flatten these PCollections into
                        single one (e.g. for downstream
                        > processing - joining - or flushing to sink)
                        >
                        > Now, the flatten will not work, because
                        these PCollections have
                        > different windowFns. It would be
                        possible to restore the windowing for
                        > either of them, but it requires to
                        somewhat break the encapsulation of
                        > the transforms that produce the windowed
                        outputs. A more natural
                        > solution is to take the
                        WindowingStrategy from the global aggregation
                        > and set it via
                        setWindowingStrategyInternal() to the other
                        > PCollection. This works, but it uses API
                        that is marked as @Internal
                        > (and obviously, the name as well
                        suggests it is not intended for
                        > client-code usage).
                        >
                        > The question is, should we make a
                        legitimate version of this call? Or
                        > should we introduce a way for
                        Flatten.pCollections() to re-window the
                        > input PCollections appropriately? In the
                        case of conflicting
                        > WindowFns, where one of them is
                        GlobalWindowing strategy, it seems to
                        > me that the user's intention is quite
                        well-defined (this might extend
                        > to some 'flatten windowFn resolution
                        strategy', maybe).
                        >
                        > WDYT?
                        >
                        >  Jan
                        >

Reply via email to