Do we have a defined way for a PTransform to create bounded PCollection
from an unbounded one (a typical example would be LIMIT acting on
unbounded input)? AFAIK, we can use SDF to manipulate watermark, but
that requires terminating the Pipeline even though there are still
upstream running transforms (e.g. sources). I'm not sure if we have a
sound definition of when a runner should terminate a Pipeline, so I
guess this is runner dependent, right? If I'm not wrong, for example
Flink does not terminate Pipeline until there is at least one running
operator, so this might require signalling sources from sink (thus
introducing some form of cycle).
Jan
On 9/15/23 18:55, Robert Bradshaw via user wrote:
On Fri, Sep 15, 2023 at 9:46 AM Reuven Lax via user
<user@beam.apache.org> wrote:
Creating composite DoFns is tricky today due to how they are
implemented (via annotated methods).
Note that this depends on the language. This should be really easy to
do from Python.
However providing such a method to compose DoFns would be very
useful IMO.
+1
On Fri, Sep 15, 2023 at 9:33 AM Joey Tran
<joey.t...@schrodinger.com> wrote:
Yeah for (1) the concern would be adding a shuffle/fusion
break and (2) sounds like the likely solution, was just hoping
there'd be one that could wrap at the PTransform level but I
realize now the PTransform abstraction is too general as you
mentioned to do something like that.
(2) will be likely what we do, though now I'm wondering if it
might be possible to create a ParDo wrapper that can take a
ParDo, extract it's dofn, wrap it, and return a new ParDo
On Fri, Sep 15, 2023, 11:53 AM Robert Bradshaw via user
<user@beam.apache.org> wrote:
+1 to looking at composite transforms. You could even have
a composite transform that takes another transform as one
of its construction arguments and whose expand method does
pre- and post-processing to the inputs/outputs
before/after applying the transform in question. (You
could even implement this as a Python decorator if you
really wanted, either decorating the expand method itself
or the full class...)
One of the difficulties is that for a general transform
there isn't necessarily a 1:N relationship between outputs
and inputs as one has for a DoFn (especially if there is
any aggregation involved). There are, however, two partial
solutions that might help.
(1) You can do a CombineGlobally with a CombineFn (Like
Sample) that returns at most N elements. You could do this
with a CombinePerKey if you can come up with a reasonable
key (e.g. the id of your input elements) that the limit
should be a applied to. Note that this may cause a lot of
data to be shuffled (though due to combiner lifting, no
more than N per bundle).
(2) You could have a DoFn that limits to N per bundle by
initializing a counter in its start_bundle and passing
elements through until the counter reaches a threshold.
(Again, one could do this per id if one is available.) It
wouldn't stop production of the elements, but if things
get fused it would still likely be fairly cheap.
Both of these could be prepended to the problematic
consuming PTransform as well.
- Robert
On Fri, Sep 15, 2023 at 8:13 AM Joey Tran
<joey.t...@schrodinger.com> wrote:
I'm aware of composite transforms and of the
distributed nature of PTransforms. I'm not suggesting
limiting the entire set and my example was more
illustrative than the actual use case.
My actual use case is basically: I have multiple
PTransforms, and let's say most of them average ~100
generated outputs for a single input. Most of these
PTransforms will occasionally run into an input though
that might output maybe 1M outputs. This can cause
issues if for example there are transforms that follow
it that require a lot of compute per input.
The simplest way to deal with this is to modify the
`DoFn`s in our Ptransforms and add a limiter in the
logic (e.g. `if num_outputs_generated >=
OUTPUTS_PER_INPUT_LIMIT: return`). We could duplicate
this logic across our transforms, but it'd be much
cleaner if we could lift up this limiting logic out of
the application logic and have some generic wrapper
that extends our transforms.
Thanks for the discussion!
On Fri, Sep 15, 2023 at 10:29 AM Alexey Romanenko
<aromanenko....@gmail.com> wrote:
I don’t think it’s possible to extend in a way
that you are asking (like, Java classes
“/extend/"). Though, you can create your own
composite PTransform that will incorporate one or
several others inside /“expand()”/ method.
Actually, most of the Beam native PTransforms are
composite transforms. Please, take a look on doc
and examples [1]
Regarding your example, please, be aware that all
PTransforms are supposed to be executed in
distributed environment and the order of records
is not guaranteed. So, limiting the whole output
by fixed number of records can be challenging -
you’d need to make sure that it will be processed
on only one worker, that means that you’d need to
shuffle all your records by the same key and
probably sort the records in way that you need.
Did you consider to use
“/org.apache.beam.sdk.transforms.Top/” for that? [2]
If it doesn’t work for you, could you provide more
details of your use case? Then we probably can
propose the more suitable solutions for that.
[1]
https://beam.apache.org/documentation/programming-guide/#composite-transforms
[2]
https://beam.apache.org/releases/javadoc/2.50.0/org/apache/beam/sdk/transforms/Top.html
—
Alexey
On 15 Sep 2023, at 14:22, Joey Tran
<joey.t...@schrodinger.com> wrote:
Is there a way to extend already defined
PTransforms? My question is probably better
illustrated with an example. Let's say I have a
PTransform that generates a very variable number
of outputs. I'd like to "wrap" that PTransform
such that if it ever creates more than say 1,000
outputs, then I just take the first 1,000 outputs
without generating the rest of the outputs.
It'd be trivial if I have access to the DoFn, but
what if the PTransform in question doesn't expose
the `DoFn`?