Do we have a defined way for a PTransform to create bounded PCollection from an unbounded one (a typical example would be LIMIT acting on unbounded input)? AFAIK, we can use SDF to manipulate watermark, but that requires terminating the Pipeline even though there are still upstream running transforms (e.g. sources). I'm not sure if we have a sound definition of when a runner should terminate a Pipeline, so I guess this is runner dependent, right? If I'm not wrong, for example Flink does not terminate Pipeline until there is at least one running operator, so this might require signalling sources from sink (thus introducing some form of cycle).

 Jan

On 9/15/23 18:55, Robert Bradshaw via user wrote:
On Fri, Sep 15, 2023 at 9:46 AM Reuven Lax via user <user@beam.apache.org> wrote:

    Creating composite DoFns is tricky today due to how they are
    implemented (via annotated methods).


Note that this depends on the language. This should be really easy to do from Python.

    However providing such a method to compose DoFns would be very
    useful IMO.


+1

    On Fri, Sep 15, 2023 at 9:33 AM Joey Tran
    <joey.t...@schrodinger.com> wrote:

        Yeah for (1) the concern would be adding a shuffle/fusion
        break and (2) sounds like the likely solution, was just hoping
        there'd be one that could wrap at the PTransform level but I
        realize now the PTransform abstraction is too general as you
        mentioned to do something like that.

        (2) will be likely what we do, though now I'm wondering if it
        might be possible to create a ParDo wrapper that can take a
        ParDo, extract it's dofn, wrap it, and return a new ParDo

        On Fri, Sep 15, 2023, 11:53 AM Robert Bradshaw via user
        <user@beam.apache.org> wrote:

            +1 to looking at composite transforms. You could even have
            a composite transform that takes another transform as one
            of its construction arguments and whose expand method does
            pre- and post-processing to the inputs/outputs
            before/after applying the transform in question. (You
            could even implement this as a Python decorator if you
            really wanted, either decorating the expand method itself
            or the full class...)

            One of the difficulties is that for a general transform
            there isn't necessarily a 1:N relationship between outputs
            and inputs as one has for a DoFn (especially if there is
            any aggregation involved). There are, however, two partial
            solutions that might help.

            (1) You can do a CombineGlobally with a CombineFn (Like
            Sample) that returns at most N elements. You could do this
            with a CombinePerKey if you can come up with a reasonable
            key (e.g. the id of your input elements) that the limit
            should be a applied to. Note that this may cause a lot of
            data to be shuffled (though due to combiner lifting, no
            more than N per bundle).

            (2) You could have a DoFn that limits to N per bundle by
            initializing a counter in its start_bundle and passing
            elements through until the counter reaches a threshold.
            (Again, one could do this per id if one is available.) It
            wouldn't stop production of the elements, but if things
            get fused it would still likely be fairly cheap.

            Both of these could be prepended to the problematic
            consuming PTransform as well.

            - Robert



            On Fri, Sep 15, 2023 at 8:13 AM Joey Tran
            <joey.t...@schrodinger.com> wrote:

                I'm aware of composite transforms and of the
                distributed nature of PTransforms. I'm not suggesting
                limiting the entire set and my example was more
                illustrative than the actual use case.

                My actual use case is basically: I have multiple
                PTransforms, and let's say most of them average ~100
                generated outputs for a single input. Most of these
                PTransforms will occasionally run into an input though
                that might output maybe 1M outputs. This can cause
                issues if for example there are transforms that follow
                it that require a lot of compute per input.

                The simplest way to deal with this is to modify the
                `DoFn`s in our Ptransforms and add a limiter in the
                logic (e.g. `if num_outputs_generated >=
                OUTPUTS_PER_INPUT_LIMIT: return`). We could duplicate
                this logic across our transforms, but it'd be much
                cleaner if we could lift up this limiting logic out of
                the application logic and have some generic wrapper
                that extends our transforms.

                Thanks for the discussion!

                On Fri, Sep 15, 2023 at 10:29 AM Alexey Romanenko
                <aromanenko....@gmail.com> wrote:

                    I don’t think it’s possible to extend in a way
                    that you are asking (like, Java classes
                    “/extend/"). Though, you can create your own
                    composite PTransform that will incorporate one or
                    several others inside /“expand()”/ method.
                    Actually, most of the Beam native PTransforms are
                    composite transforms. Please, take a look on doc
                    and examples [1]

                    Regarding your example, please, be aware that all
                    PTransforms are supposed to be executed in
                    distributed environment and the order of records
                    is not guaranteed. So, limiting the whole output
                    by fixed number of records can be challenging -
                    you’d need to make sure that it will be processed
                    on only one worker, that means that you’d need to
                    shuffle all your records by the same key and
                    probably sort the records in way that you need.

                    Did you consider to use
                    “/org.apache.beam.sdk.transforms.Top/” for that? [2]

                    If it doesn’t work for you, could you provide more
                    details of your use case? Then we probably can
                    propose the more suitable solutions for that.

                    [1]
                    
https://beam.apache.org/documentation/programming-guide/#composite-transforms
                    [2]
                    
https://beam.apache.org/releases/javadoc/2.50.0/org/apache/beam/sdk/transforms/Top.html

                    —
                    Alexey

                    On 15 Sep 2023, at 14:22, Joey Tran
                    <joey.t...@schrodinger.com> wrote:

                    Is there a way to extend already defined
                    PTransforms? My question is probably better
                    illustrated with an example. Let's say I have a
                    PTransform that generates a very variable number
                    of outputs. I'd like to "wrap" that PTransform
                    such that if it ever creates more than say 1,000
                    outputs, then I just take the first 1,000 outputs
                    without generating the rest of the outputs.

                    It'd be trivial if I have access to the DoFn, but
                    what if the PTransform in question doesn't expose
                    the `DoFn`?

Reply via email to