OK of course hacking this up there's already combinatorial 2x2 that perhaps
people were alluding to but I missed.

RedistributeByKey (user's choice)
RedistributeArbitrarily (runner's choice! default may be random keys but
that is not required)

RedistributeArbitrarilyAllowingDuplicates (this is the use case I am trying
to get at with the design & impl - basically runner's choice and also no
need to dedup or persist)
RedistributeByKeyAllowingDuplicates (is this an important use case? I don't
know - if so, then it points to some future where you tag any transform
with this)

So now I kind of want to have two URNs (one per input/output type) and a
config that allows duplicates.

WDYT? Do the people who liked having separate URNs want to have 4 URNs? We
can still have whatever end-user SDK interface we need to have regardless.
I think in Java we want it to look like this regardless:

Redistribute.arbitrarily()
Redistribute.byKey()
Redistribute.arbitrarily().allowingDuplicates()
Redistribute.byKey().allowingDuplicates()

And Python

beam.Redistribute()
beam.RedistributeByKey()
beam.Redistribute(allowDuplicates=true)
beam.RedistributeByKey(allowDuplicates=true)

I'll add end-user APIs to the design doc (and ask for help on Python and Go
idioms) but they are pretty short and sweet.

Kenn

On Thu, Feb 8, 2024 at 1:45 PM Robert Burke <rob...@frantil.com> wrote:

> Was that only October? Wow.
>
> Option 2 SGTM, with the adjustment to making the core of the URN
> "redistribute_allowing_duplicates" instead of building from the unspecified
> Reshuffle semantics.
>
> Transforms getting updated to use the new transform can have their
> @RequiresStableInputs annotation added  accordingly if they need that
> property per previous discussions.
>
>
>
> On Thu, Feb 8, 2024, 10:31 AM Kenneth Knowles <k...@apache.org> wrote:
>
>>
>>
>> On Wed, Feb 7, 2024 at 5:15 PM Robert Burke <lostl...@apache.org> wrote:
>>
>>> OK, so my stance is a configurable Reshuffle might be interesting, so my
>>> vote is +1, along the following lines.
>>>
>>> 1. Use a new URN (beam:transform:reshuffle:v2) and attach a new
>>> ReshufflePayload to it.
>>>
>>
>> Ah, I see there's more than one variation of the "new URN" approach.
>> Namely, you have a new version of an existing URN prefix, while I had in
>> mind that it was a totally new base URN. In other words the open question I
>> meant to pose is between these options:
>>
>> 1. beam:transform:reshuffle:v2 + { allowing_duplicates: true }
>> 2. beam:transform:reshuffle_allowing_duplicates:v1 {}
>>
>> The most compelling argument in favor of option 2 is that it could have a
>> distinct payload type associated with the different URN (maybe parameters
>> around tweaking how much duplication? I don't know... I actually expect
>> neither payload to evolve much if at all).
>>
>> There were also two comments in favor of option 2 on the design doc.
>>
>>   -> Unknown "urns for composite transforms" already default to the
>>> subtransform graph implementation for most (all?) runners.
>>>   -> Having a payload to toggle this behavior then can have whatever
>>> desired behavior we like. It also allows for additional configurations
>>> added in later on. This is preferable to a plethora of one-off urns IMHO.
>>> We can have SDKs gate configuration combinations as needed if additional
>>> ones appear.
>>>
>>> 2. It's very cheap to add but also ignore, as the default is "Do what
>>> we're already doing without change", and not all SDKs need to add it right
>>> away. It's more important that the portable way is defined at least, so
>>> it's easy for other SDKs to add and handle it.
>>>
>>> I would prefer we have a clear starting point on what Reshuffle does
>>> though. I remain a fan of "The Reshuffle (v2) Transform is a user
>>> designated hint to a runner for a change in parallelism. By default, it
>>> produces an output PCollection that has the same elements as the input
>>> PCollection".
>>>
>>
>> +1 this is a better phrasing of the spec I propose in
>> https://s.apache.org/beam-redistribute but let's not get into it here if
>> we can, and just evaluate the delta from that design to
>> https://s.apache.org/beam-reshuffle-allowing-duplicates
>>
>> Kenn
>>
>>
>>> It remains an open question about what that means for
>>> checkpointing/durability behavior, but that's largely been runner dependent
>>> anyway. I admit the above definition is biased by the uses of Reshuffle I'm
>>> aware of, which largely are to incur a fusion break in the execution graph.
>>>
>>> Robert Burke
>>> Beam Go Busybody
>>>
>>> On 2024/01/31 16:01:33 Kenneth Knowles wrote:
>>> > On Wed, Jan 31, 2024 at 4:21 AM Jan Lukavský <je...@seznam.cz> wrote:
>>> >
>>> > > Hi,
>>> > >
>>> > > if I understand this proposal correctly, the motivation is actually
>>> > > reducing latency by bypassing bundle atomic guarantees, bundles
>>> after "at
>>> > > least once" Reshuffle would be reconstructed independently of the
>>> > > pre-shuffle bundling. Provided this is correct, it seems that the
>>> behavior
>>> > > is slightly more general than for the case of Reshuffle. We have
>>> already
>>> > > some transforms that manipulate a specific property of a PCollection
>>> - if
>>> > > it may or might not contain duplicates. That is manipulated in two
>>> ways -
>>> > > explicitly removing duplicates based on IDs on sources that generate
>>> > > duplicates and using @RequiresStableInput, mostly in sinks. These
>>> > > techniques modify an inherent property of a PCollection, that is if
>>> it
>>> > > contains or does not contain possible duplicates originating from
>>> the same
>>> > > input element.
>>> > >
>>> > > There are two types of duplicates - duplicate elements in _different
>>> > > bundles_ (typically from at-least-once sources) and duplicates
>>> arising due
>>> > > to bundle reprocessing (affecting only transforms with side-effects,
>>> that
>>> > > is what we solve by @RequiresStableInput). The point I'm trying to
>>> get to -
>>> > > should we add these properties to PCollections (contains cross-bundle
>>> > > duplicates vs. does not) and PTransforms ("outputs deduplicated
>>> elements"
>>> > > and "requires stable input")? That would allow us to analyze the
>>> Pipeline
>>> > > DAG and provide appropriate implementation for Reshuffle
>>> automatically, so
>>> > > that a new URN or flag would not be needed. Moreover, this might be
>>> useful
>>> > > for a broader range of optimizations.
>>> > >
>>> > > WDYT?
>>> > >
>>> > These are interesting ideas that could be useful. I think they achieve
>>> a
>>> > different goal in my case. I actually want to explicitly allow
>>> > Reshuffle.allowingDuplicates() to skip expensive parts of its
>>> > implementation that are used to prevent duplicates.
>>> >
>>> > The property that would make it possible to automate this in the case
>>> of
>>> > combiners, or at least validate that the pipeline still gives 100%
>>> accurate
>>> > answers, would be something like @InsensitiveToDuplicateElements which
>>> is
>>> > longer and less esoteric than @Idempotent. For situations where there
>>> is a
>>> > source or sink that only has at-least-once guarantees then yea maybe
>>> the
>>> > property "has duplicates" will let you know that you may as well use
>>> the
>>> > duplicating reshuffle without any loss. But still, you may not want to
>>> > introduce *more* duplicates.
>>> >
>>> > I would say my proposal is a step in this direction that would gain
>>> some
>>> > experience and tools that we might later use in a more automated way.
>>> >
>>> > Kenn
>>> >
>>> > >  Jan
>>> > > On 1/30/24 23:22, Robert Burke wrote:
>>> > >
>>> > > Is the benefit of this proposal just the bounded deviation from the
>>> > > existing reshuffle?
>>> > >
>>> > > Reshuffle is already rather dictated by arbitrary runner choice, from
>>> > > simply ignoring the node, to forcing a materialization break, to a
>>> full
>>> > > shuffle implementation which has additional side effects.
>>> > >
>>> > > But model wise I don't believe it guarantees specific checkpointing
>>> or
>>> > > re-execution behavior as currently specified. The proto only says it
>>> > > represents the operation (without specifying the behavior, that is a
>>> big
>>> > > problem).
>>> > >
>>> > > I guess my concern here is that it implies/codifies that the existing
>>> > > reshuffle has more behavior than it promises outside of the Java SDK.
>>> > >
>>> > > "Allowing duplicates" WRT reshuffle is tricky. It feels like mostly
>>> allows
>>> > > an implementation that may mean the inputs into the reshuffle might
>>> be
>>> > > re-executed for example. But that's always under the runner's
>>> discretion ,
>>> > > and ultimately it could also prevent even getting the intended
>>> benefit of a
>>> > > reshuffle (notionally, just a fusion break).
>>> > >
>>> > > Is there even a valid way to implement the notion of a reshuffle that
>>> > > leads to duplicates outside of a retry/resilience case?
>>> > >
>>> > > -------
>>> > >
>>> > > To be clear, I'm not against the proposal. I'm against that its being
>>> > > built on a non-existent foundation. If the behavior isn't already
>>> defined,
>>> > > it's impossible to specify a real deviation from it.
>>> > >
>>> > > I'm all for more specific behaviors if means we actually clarify
>>> what the
>>> > > original version is in the protos, since its news to me ( just now,
>>> because
>>> > > I looked) that the Java reshuffle promises GBK-like side effects. But
>>> > > that's a long deprecated transform without a satisfying replacement
>>> for
>>> > > it's usage, so it may be moot.
>>> > >
>>> > > Robert Burke
>>> > >
>>> > >
>>> > >
>>> > > On Tue, Jan 30, 2024, 1:34 PM Kenneth Knowles <k...@apache.org>
>>> wrote:
>>> > >
>>> > >> Hi all,
>>> > >>
>>> > >> Just when you thought I had squeezed all the possible interest out
>>> of
>>> > >> this most boring-seeming of transforms :-)
>>> > >>
>>> > >> I wrote up a very quick proposal as a doc [1]. It is short enough
>>> that I
>>> > >> will also put the main idea and main question in this email so you
>>> can
>>> > >> quickly read. Best to put comments in the.
>>> > >>
>>> > >> Main idea: add a variation of Reshuffle that allows duplicates, aka
>>> "at
>>> > >> least once", so that users and runners can benefit from efficiency
>>> if it is
>>> > >> possible
>>> > >>
>>> > >> Main question: is it best as a parameter to existing reshuffle
>>> transforms
>>> > >> or as new URN(s)? I have proposed it as a parameter but I think
>>> either one
>>> > >> could work.
>>> > >>
>>> > >> I would love feedback on the main idea, main question, or anywhere
>>> on the
>>> > >> doc.
>>> > >>
>>> > >> Thanks!
>>> > >>
>>> > >> Kenn
>>> > >>
>>> > >> [1] https://s.apache.org/beam-reshuffle-allowing-duplicates
>>> > >>
>>> > >
>>> >
>>>
>>

Reply via email to