On Fri, Sep 15, 2023 at 9:46 AM Reuven Lax via user
<user@beam.apache.org> wrote:
Creating composite DoFns is tricky today due to how they are
implemented (via annotated methods).
Note that this depends on the language. This should be really easy
to do from Python.
However providing such a method to compose DoFns would be very
useful IMO.
+1
On Fri, Sep 15, 2023 at 9:33 AM Joey Tran
<joey.t...@schrodinger.com> wrote:
Yeah for (1) the concern would be adding a shuffle/fusion
break and (2) sounds like the likely solution, was just
hoping there'd be one that could wrap at the PTransform
level but I realize now the PTransform abstraction is too
general as you mentioned to do something like that.
(2) will be likely what we do, though now I'm wondering if
it might be possible to create a ParDo wrapper that can take
a ParDo, extract it's dofn, wrap it, and return a new ParDo
On Fri, Sep 15, 2023, 11:53 AM Robert Bradshaw via user
<user@beam.apache.org> wrote:
+1 to looking at composite transforms. You could even
have a composite transform that takes another transform
as one of its construction arguments and whose expand
method does pre- and post-processing to the
inputs/outputs before/after applying the transform in
question. (You could even implement this as a Python
decorator if you really wanted, either decorating the
expand method itself or the full class...)
One of the difficulties is that for a general transform
there isn't necessarily a 1:N relationship between
outputs and inputs as one has for a DoFn (especially if
there is any aggregation involved). There are, however,
two partial solutions that might help.
(1) You can do a CombineGlobally with a CombineFn (Like
Sample) that returns at most N elements. You could do
this with a CombinePerKey if you can come up with a
reasonable key (e.g. the id of your input elements) that
the limit should be a applied to. Note that this may
cause a lot of data to be shuffled (though due to
combiner lifting, no more than N per bundle).
(2) You could have a DoFn that limits to N per bundle by
initializing a counter in its start_bundle and passing
elements through until the counter reaches a threshold.
(Again, one could do this per id if one is available.)
It wouldn't stop production of the elements, but if
things get fused it would still likely be fairly cheap.
Both of these could be prepended to the problematic
consuming PTransform as well.
- Robert
On Fri, Sep 15, 2023 at 8:13 AM Joey Tran
<joey.t...@schrodinger.com> wrote:
I'm aware of composite transforms and of the
distributed nature of PTransforms. I'm not
suggesting limiting the entire set and my example
was more illustrative than the actual use case.
My actual use case is basically: I have multiple
PTransforms, and let's say most of them average ~100
generated outputs for a single input. Most of these
PTransforms will occasionally run into an input
though that might output maybe 1M outputs. This can
cause issues if for example there are transforms
that follow it that require a lot of compute per input.
The simplest way to deal with this is to modify the
`DoFn`s in our Ptransforms and add a limiter in the
logic (e.g. `if num_outputs_generated >=
OUTPUTS_PER_INPUT_LIMIT: return`). We could
duplicate this logic across our transforms, but it'd
be much cleaner if we could lift up this limiting
logic out of the application logic and have some
generic wrapper that extends our transforms.
Thanks for the discussion!
On Fri, Sep 15, 2023 at 10:29 AM Alexey Romanenko
<aromanenko....@gmail.com> wrote:
I don’t think it’s possible to extend in a way
that you are asking (like, Java classes
“/extend/"). Though, you can create your own
composite PTransform that will incorporate one
or several others inside /“expand()”/ method.
Actually, most of the Beam native PTransforms
are composite transforms. Please, take a look on
doc and examples [1]
Regarding your example, please, be aware that
all PTransforms are supposed to be executed in
distributed environment and the order of records
is not guaranteed. So, limiting the whole output
by fixed number of records can be challenging -
you’d need to make sure that it will be
processed on only one worker, that means that
you’d need to shuffle all your records by the
same key and probably sort the records in way
that you need.
Did you consider to use
“/org.apache.beam.sdk.transforms.Top/” for that? [2]
If it doesn’t work for you, could you provide
more details of your use case? Then we probably
can propose the more suitable solutions for that.
[1]
https://beam.apache.org/documentation/programming-guide/#composite-transforms
[2]
https://beam.apache.org/releases/javadoc/2.50.0/org/apache/beam/sdk/transforms/Top.html
—
Alexey
On 15 Sep 2023, at 14:22, Joey Tran
<joey.t...@schrodinger.com> wrote:
Is there a way to extend already defined
PTransforms? My question is probably better
illustrated with an example. Let's say I have a
PTransform that generates a very variable
number of outputs. I'd like to "wrap" that
PTransform such that if it ever creates more
than say 1,000 outputs, then I just take the
first 1,000 outputs without generating the rest
of the outputs.
It'd be trivial if I have access to the DoFn,
but what if the PTransform in question doesn't
expose the `DoFn`?