This applies to sources and cannot be used inside the Pipeline. Essentially, what would be needed is a support for a back edge from a PTransform to source(s) to notify the source PTransform(s) to terminate. This is also essential for any possible support for iterations (which would probably be implemented as a "decorator" around a PTransform, the "limit" is essentially only a special case of a single iteration). I don't want to discuss the details specifically here, but if we would want to add a support for decorators, we might have in mind that there is something missing in the model for full support.

We can open a different discussion thread if there is any interest.

Best,

 Jan

On 9/18/23 17:19, Alexey Romanenko wrote:
In the past it was BoundedReadFromUnboundedSource that is still, iinm, used in KafkaIO to limit read by number of records or time. Though, in the same time we had a discussion that it should not be used anymore and considered as obsolete transform.

On 18 Sep 2023, at 09:28, Jan Lukavský <je...@seznam.cz> wrote:

Do we have a defined way for a PTransform to create bounded PCollection from an unbounded one (a typical example would be LIMIT acting on unbounded input)? AFAIK, we can use SDF to manipulate watermark, but that requires terminating the Pipeline even though there are still upstream running transforms (e.g. sources). I'm not sure if we have a sound definition of when a runner should terminate a Pipeline, so I guess this is runner dependent, right? If I'm not wrong, for example Flink does not terminate Pipeline until there is at least one running operator, so this might require signalling sources from sink (thus introducing some form of cycle).

 Jan

On 9/15/23 18:55, Robert Bradshaw via user wrote:
On Fri, Sep 15, 2023 at 9:46 AM Reuven Lax via user <user@beam.apache.org> wrote:

    Creating composite DoFns is tricky today due to how they are
    implemented (via annotated methods).


Note that this depends on the language. This should be really easy to do from Python.

    However providing such a method to compose DoFns would be very
    useful IMO.


+1

    On Fri, Sep 15, 2023 at 9:33 AM Joey Tran
    <joey.t...@schrodinger.com> wrote:

        Yeah for (1) the concern would be adding a shuffle/fusion
        break and (2) sounds like the likely solution, was just
        hoping there'd be one that could wrap at the PTransform
        level but I realize now the PTransform abstraction is too
        general as you mentioned to do something like that.

        (2) will be likely what we do, though now I'm wondering if
        it might be possible to create a ParDo wrapper that can take
        a ParDo, extract it's dofn, wrap it, and return a new ParDo

        On Fri, Sep 15, 2023, 11:53 AM Robert Bradshaw via user
        <user@beam.apache.org> wrote:

            +1 to looking at composite transforms. You could even
            have a composite transform that takes another transform
            as one of its construction arguments and whose expand
            method does pre- and post-processing to the
            inputs/outputs before/after applying the transform in
            question. (You could even implement this as a Python
            decorator if you really wanted, either decorating the
            expand method itself or the full class...)

            One of the difficulties is that for a general transform
            there isn't necessarily a 1:N relationship between
            outputs and inputs as one has for a DoFn (especially if
            there is any aggregation involved). There are, however,
            two partial solutions that might help.

            (1) You can do a CombineGlobally with a CombineFn (Like
            Sample) that returns at most N elements. You could do
            this with a CombinePerKey if you can come up with a
            reasonable key (e.g. the id of your input elements) that
            the limit should be a applied to. Note that this may
            cause a lot of data to be shuffled (though due to
            combiner lifting, no more than N per bundle).

            (2) You could have a DoFn that limits to N per bundle by
            initializing a counter in its start_bundle and passing
            elements through until the counter reaches a threshold.
            (Again, one could do this per id if one is available.)
            It wouldn't stop production of the elements, but if
            things get fused it would still likely be fairly cheap.

            Both of these could be prepended to the problematic
            consuming PTransform as well.

            - Robert



            On Fri, Sep 15, 2023 at 8:13 AM Joey Tran
            <joey.t...@schrodinger.com> wrote:

                I'm aware of composite transforms and of the
                distributed nature of PTransforms. I'm not
                suggesting limiting the entire set and my example
                was more illustrative than the actual use case.

                My actual use case is basically: I have multiple
                PTransforms, and let's say most of them average ~100
                generated outputs for a single input. Most of these
                PTransforms will occasionally run into an input
                though that might output maybe 1M outputs. This can
                cause issues if for example there are transforms
                that follow it that require a lot of compute per input.

                The simplest way to deal with this is to modify the
                `DoFn`s in our Ptransforms and add a limiter in the
                logic (e.g. `if num_outputs_generated >=
                OUTPUTS_PER_INPUT_LIMIT: return`). We could
                duplicate this logic across our transforms, but it'd
                be much cleaner if we could lift up this limiting
                logic out of the application logic and have some
                generic wrapper that extends our transforms.

                Thanks for the discussion!

                On Fri, Sep 15, 2023 at 10:29 AM Alexey Romanenko
                <aromanenko....@gmail.com> wrote:

                    I don’t think it’s possible to extend in a way
                    that you are asking (like, Java classes
                    “/extend/"). Though, you can create your own
                    composite PTransform that will incorporate one
                    or several others inside /“expand()”/ method.
                    Actually, most of the Beam native PTransforms
                    are composite transforms. Please, take a look on
                    doc and examples [1]

                    Regarding your example, please, be aware that
                    all PTransforms are supposed to be executed in
                    distributed environment and the order of records
                    is not guaranteed. So, limiting the whole output
                    by fixed number of records can be challenging -
                    you’d need to make sure that it will be
                    processed on only one worker, that means that
                    you’d need to shuffle all your records by the
                    same key and probably sort the records in way
                    that you need.

                    Did you consider to use
                    “/org.apache.beam.sdk.transforms.Top/” for that? [2]

                    If it doesn’t work for you, could you provide
                    more details of your use case? Then we probably
                    can propose the more suitable solutions for that.

                    [1]
                    
https://beam.apache.org/documentation/programming-guide/#composite-transforms
                    [2]
                    
https://beam.apache.org/releases/javadoc/2.50.0/org/apache/beam/sdk/transforms/Top.html

                    —
                    Alexey

                    On 15 Sep 2023, at 14:22, Joey Tran
                    <joey.t...@schrodinger.com> wrote:

                    Is there a way to extend already defined
                    PTransforms? My question is probably better
                    illustrated with an example. Let's say I have a
                    PTransform that generates a very variable
                    number of outputs. I'd like to "wrap" that
                    PTransform such that if it ever creates more
                    than say 1,000 outputs, then I just take the
                    first 1,000 outputs without generating the rest
                    of the outputs.

                    It'd be trivial if I have access to the DoFn,
                    but what if the PTransform in question doesn't
                    expose the `DoFn`?

Reply via email to