Well I accidentally conflated "stateful" and "persisting", but anyhow
yea we aren't targeting to have one Beam primitive for each thing that
is probably a runner primitive.

On Thu, Oct 19, 2023 at 2:25 PM Kenneth Knowles <k...@apache.org> wrote:
>
> On Fri, Oct 13, 2023 at 12:51 PM Jan Lukavský <je...@seznam.cz> wrote:
> >
> > Hi,
> >
> > I think there's been already said nearly everything in this thread, but ... 
> > it is time for Friday discussions. :)
> >
> > Today I recalled of a discussion we've had long time ago, when we were 
> > designing Euphoria (btw, deprecating and removing it is still on my todo 
> > list, I should create a vote thread for that). We had 4 primitives:
> >
> >  a) non-shuffle, stateless ~ stateless ParDo
> >
> >  b) shuffle, stateful ~ stateful ParDo, with the ability (under the right 
> > circumstances,  i.e. defined event-time trigger, defined state merge 
> > function, ...) to be performed in a "combinable way".
> >
> >  c) shuffle, stateless ~ Reshuffle
> >
> >  d) non-shuffle, stateful - nope, makes no sense :) - part of the 
> > "combinable stateful shuffle operation"
> >
> >  e) union ~ Flatten
> >
> > Turns out you can build everything bottom up from these.
> >
> > Now, the not-so-well defined semantics of Reshuffle (Redistribute) might 
> > arise from the fact it is not a primitive. Stateless shuffling of data is 
> > definitely a primitive of all runners.
>
> Not Dataflow :-)
>
> But more importantly, Beam primitives are deliberately chosen to be
> fundamental data operations, not physical plan steps that a runner
> might use. In other words, Beam is decidedly _not_ a library for
> building composites that eventually are constructed from runner
> primitives. It is more like SQL in that it is a library for building
> composites that eventually are constructed from fundamental operations
> on data, that every engine (like every RDBMS) will be able to
> implement in its own way.
>
> Kenn
>
> >
> > Therefore here goes the question - should Redistribute be a primitive and 
> > not be built up from other transforms?
> >
> > Best,
> >
> >  Jan
> >
> > On 10/6/23 21:12, Kenneth Knowles wrote:
> >
> >
> >
> > On Fri, Oct 6, 2023 at 3:07 PM Jan Lukavský <je...@seznam.cz> wrote:
> >>
> >>
> >> On 10/6/23 15:11, Kenneth Knowles wrote:
> >>
> >>
> >>
> >> On Fri, Oct 6, 2023 at 3:20 AM Jan Lukavský <je...@seznam.cz> wrote:
> >>>
> >>> Hi,
> >>>
> >>> there is also one other thing to mention with relation to 
> >>> Reshuffle/RequiresStableinput and that is that our current implementation 
> >>> of RequiresStableInput can break without Reshuffle in some corner cases 
> >>> on most portable runners, at least with Java GreedyPipelineFuser, see 
> >>> [1]. The only way to workaround this currently is inserting Reshuffle (or 
> >>> any other fusion-break transform) directly before the stable DoFn 
> >>> (Reshuffle is handy, because it does not change the data). I think we 
> >>> should either somehow fix the issue [1] or include fusion break as a 
> >>> mandatory requirement for the new Redistribute transform as well (at 
> >>> least with some variant) or possibly add a new "hint" for non-optional 
> >>> fusion breaking.
> >>
> >> This is actually the bug we have wanted to fix for years - redistribute 
> >> has nothing to do with checkpointing or stable input and Reshuffle 
> >> incorrectly merges the two concepts.
> >>
> >> I agree that we couldn't make any immediate change that will break a 
> >> runner. I believe runners that depend on Reshuffle to provide stable input 
> >> will also provide stable input after GroupByKey. Since the SDK expansion 
> >> of Reshuffle will still contains a GBK, those runners functionality will 
> >> be unchanged.
> >>
> >> I don't yet have a firm opinion between the these approaches:
> >>
> >> 1. Adjust the Java SDK implementation of Reshuffle (and maybe other SDKs 
> >> if needed). With some flag so that users can use the old wrong behavior 
> >> for update compatibility.
> >> 2. Add a Redistribute transform to the SDKs that has the right behavior 
> >> and leave Reshuffle as it is.
> >> 1+2. Add the Redistribute transform but also make Reshuffle call it, so 
> >> Reshuffle also gets the new behavior, with the same flag so that users can 
> >> use the old wrong behavior for update compatibility.
> >>
> >> All of these will leave "Reshuffle for RequestStableInput" alone for now. 
> >> The options that include (2) will move us a little closer to migrating to 
> >> a "better" future state.
> >>
> >> I might have not expressed the right way. I understand that Reshuffle 
> >> having "stable input" functionality is non-portable side-effect. It would 
> >> be nice to get rid of it and my impression from this thread was that we 
> >> would try to deprecate Reshuffle and introduce Redistribute which will not 
> >> have such semantics. All of this is fine, problem is that we currently (is 
> >> some corner cases) rely on Reshuffle *even though* Pipeline uses 
> >> @RequiresStableInput. That is due to the fact that Reshuffle also ensures 
> >> fusion breaking.  Fusing non-deterministic DoFn with stable DoFn breaks 
> >> the stable input property, because runners can ensure stability only at 
> >> the input of executable stage. Therefore we would either need to:
> >>
> >>  a) define Redistribute as being an unconditional fusion break boundary, or
> >>
> >>  b) define some other transform or hint to be able to enforce fusion 
> >> breaking
> >>
> >> Otherwise I'd be in favor of 2 and deprecation of Reshuffle.
> >
> >
> > Just to be very clear - my goal right now is to just give Reshuffle a 
> > consistent semantics. Even for the old "stable input + redistribute" use of 
> > Reshuffle, the semantics are inconsistent/undefined and the Java SDK 
> > expansion is wrong. Changing things having to do with stable input are not 
> > part of what I am trying to change right now. But it is fine to do things 
> > that prepare for that.
> >
> > Kenn
> >
> >>
> >>  Jan
> >>
> >>
> >> Any votes? Any other options?
> >>
> >> Kenn
> >>
> >>>  Jan
> >>>
> >>> [1] https://github.com/apache/beam/issues/24655
> >>>
> >>> On 10/5/23 21:01, Robert Burke wrote:
> >>>
> >>> Reshuffle/redistribute being a transform has the benefit of allowing 
> >>> existing runners that aren't updated to be aware of the new urns to rely 
> >>> on an SDK side implementation, which may be more expensive than what the 
> >>> runner is able to do with that awareness.
> >>>
> >>> Aka: it gives purpose to the fallback implementations.
> >>>
> >>> On Thu, Oct 5, 2023, 9:03 AM Kenneth Knowles <k...@apache.org> wrote:
> >>>>
> >>>> Another perspective, ignoring runners custom implementations and 
> >>>> non-Java SDKs could be that the semantics are perfectly well defined: it 
> >>>> is a composite and its semantics are defined by its implementation in 
> >>>> terms of primitives. It is just that this expansion is not what we want 
> >>>> so we should not use it (and also we shouldn't use "whatever the 
> >>>> implementation does" as a spec for anything we care about).
> >>>>
> >>>> On Thu, Oct 5, 2023 at 11:56 AM Kenneth Knowles <k...@apache.org> wrote:
> >>>>>
> >>>>> I totally agree. I am motivated right now by the fact that it is 
> >>>>> already used all over the place but with no consistent semantics. Maybe 
> >>>>> it is simpler to focus on just making the minimal change, which would 
> >>>>> basically be to update the expansion of the Reshuffle in the Java SDK.
> >>>>>
> >>>>> Kenn
> >>>>>
> >>>>> On Thu, Oct 5, 2023 at 11:39 AM John Casey <theotherj...@google.com> 
> >>>>> wrote:
> >>>>>>
> >>>>>> Given that this is a hint, I'm not sure redistribute should be a 
> >>>>>> PTransform as opposed to some other way to hint to a runner.
> >>>>>>
> >>>>>> I'm not sure of what the syntax of that would be, but a semantic no-op 
> >>>>>> transform that the runner may or may not do anything with is odd.
> >>>>>>
> >>>>>> On Thu, Oct 5, 2023 at 11:30 AM Kenneth Knowles <k...@apache.org> 
> >>>>>> wrote:
> >>>>>>>
> >>>>>>> So a high level suggestion from Robert that I want to highlight as a 
> >>>>>>> top-post:
> >>>>>>>
> >>>>>>> Instead of focusing on just fixing the SDKs and runners Reshuffle, 
> >>>>>>> this could be an opportunity to introduce Redistribute which was 
> >>>>>>> proposed in the long-ago thread. The semantics are identical but it 
> >>>>>>> is more clear that it only is a hint about redistributing data and 
> >>>>>>> there is no expectation of a checkpoint.
> >>>>>>>
> >>>>>>> This new name may also be an opportunity to maintain update 
> >>>>>>> compatibility (though this may actually be leaving unsafe code in 
> >>>>>>> user's hands) and/or separate @RequiresStableInput/checkpointing uses 
> >>>>>>> of Reshuffle from redistribution-only uses of Reshuffle.
> >>>>>>>
> >>>>>>> Any other thoughts on this one high level bit?
> >>>>>>>
> >>>>>>> Kenn
> >>>>>>>
> >>>>>>> On Thu, Oct 5, 2023 at 11:15 AM Kenneth Knowles <k...@apache.org> 
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Wed, Oct 4, 2023 at 7:45 PM Robert Burke <lostl...@apache.org> 
> >>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>> LGTM.
> >>>>>>>>>
> >>>>>>>>> It looks the Go SDK already adheres to these semantics as well for 
> >>>>>>>>> the reference impl(well, reshuffle/redistribute_randomly, _by_key 
> >>>>>>>>> isn't implemented in the Go SDK, and only uses the existing 
> >>>>>>>>> unqualified reshuffle URN [0].
> >>>>>>>>>
> >>>>>>>>> The original strategy, and then for every element, the original 
> >>>>>>>>> Window, TS, and Pane are all serialized, shuffled, and then 
> >>>>>>>>> deserialized downstream.
> >>>>>>>>>
> >>>>>>>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go#L65
> >>>>>>>>>
> >>>>>>>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go#L145
> >>>>>>>>>
> >>>>>>>>> Prism at the moment vaccuously implements reshuffle by omitting the 
> >>>>>>>>> node, and rewriting the inputs and outputs [1], as it's a local 
> >>>>>>>>> runner with single transform per bundle execution, but I was 
> >>>>>>>>> intending to make it a fusion break regardless.  Ultimately prism's 
> >>>>>>>>> "test" variant will default to executing the SDKs dictated 
> >>>>>>>>> reference implementation for the composite(s), and any "fast" or 
> >>>>>>>>> "prod" variant would simply do the current implementation.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Very nice!
> >>>>>>>>
> >>>>>>>> And of course I should have linked out to the existing reshuffle URN 
> >>>>>>>> in the proto.
> >>>>>>>>
> >>>>>>>> Kenn
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Robert Burke
> >>>>>>>>> Beam Go Busybody
> >>>>>>>>>
> >>>>>>>>> [0]: 
> >>>>>>>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L46C3-L46C50
> >>>>>>>>> [1]: 
> >>>>>>>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go#L82
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On 2023/09/26 15:43:53 Kenneth Knowles wrote:
> >>>>>>>>> > Hi everyone,
> >>>>>>>>> >
> >>>>>>>>> > Recently there was a bug [1] caused by discrepancies between two 
> >>>>>>>>> > of
> >>>>>>>>> > Dataflow's reshuffle implementations. I think the reference 
> >>>>>>>>> > implementation
> >>>>>>>>> > in the Java SDK [2] also does not match. This all led to 
> >>>>>>>>> > discussion on the
> >>>>>>>>> > bug and the pull request [3] about what the actual semantics 
> >>>>>>>>> > should be. I
> >>>>>>>>> > got it wrong, maybe multiple times. So I wrote up a very short 
> >>>>>>>>> > document to
> >>>>>>>>> > finish the discussion:
> >>>>>>>>> >
> >>>>>>>>> >     https://s.apache.org/beam-reshuffle
> >>>>>>>>> >
> >>>>>>>>> > This is also probably among the simplest imaginable use of
> >>>>>>>>> > http://s.apache.org/ptransform-design-doc in case you want to see 
> >>>>>>>>> > kind of
> >>>>>>>>> > how I intended it to be used.
> >>>>>>>>> >
> >>>>>>>>> > Kenn
> >>>>>>>>> >
> >>>>>>>>> > [1] https://github.com/apache/beam/issues/28219
> >>>>>>>>> > [2]
> >>>>>>>>> > https://github.com/apache/beam/blob/d52b077ad505c8b50f10ec6a4eb83d385cdaf96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L84
> >>>>>>>>> > [3] https://github.com/apache/beam/pull/28272
> >>>>>>>>> >

Reply via email to