Well I accidentally conflated "stateful" and "persisting", but anyhow yea we aren't targeting to have one Beam primitive for each thing that is probably a runner primitive.
On Thu, Oct 19, 2023 at 2:25 PM Kenneth Knowles <k...@apache.org> wrote: > > On Fri, Oct 13, 2023 at 12:51 PM Jan Lukavský <je...@seznam.cz> wrote: > > > > Hi, > > > > I think there's been already said nearly everything in this thread, but ... > > it is time for Friday discussions. :) > > > > Today I recalled of a discussion we've had long time ago, when we were > > designing Euphoria (btw, deprecating and removing it is still on my todo > > list, I should create a vote thread for that). We had 4 primitives: > > > > a) non-shuffle, stateless ~ stateless ParDo > > > > b) shuffle, stateful ~ stateful ParDo, with the ability (under the right > > circumstances, i.e. defined event-time trigger, defined state merge > > function, ...) to be performed in a "combinable way". > > > > c) shuffle, stateless ~ Reshuffle > > > > d) non-shuffle, stateful - nope, makes no sense :) - part of the > > "combinable stateful shuffle operation" > > > > e) union ~ Flatten > > > > Turns out you can build everything bottom up from these. > > > > Now, the not-so-well defined semantics of Reshuffle (Redistribute) might > > arise from the fact it is not a primitive. Stateless shuffling of data is > > definitely a primitive of all runners. > > Not Dataflow :-) > > But more importantly, Beam primitives are deliberately chosen to be > fundamental data operations, not physical plan steps that a runner > might use. In other words, Beam is decidedly _not_ a library for > building composites that eventually are constructed from runner > primitives. It is more like SQL in that it is a library for building > composites that eventually are constructed from fundamental operations > on data, that every engine (like every RDBMS) will be able to > implement in its own way. > > Kenn > > > > > Therefore here goes the question - should Redistribute be a primitive and > > not be built up from other transforms? > > > > Best, > > > > Jan > > > > On 10/6/23 21:12, Kenneth Knowles wrote: > > > > > > > > On Fri, Oct 6, 2023 at 3:07 PM Jan Lukavský <je...@seznam.cz> wrote: > >> > >> > >> On 10/6/23 15:11, Kenneth Knowles wrote: > >> > >> > >> > >> On Fri, Oct 6, 2023 at 3:20 AM Jan Lukavský <je...@seznam.cz> wrote: > >>> > >>> Hi, > >>> > >>> there is also one other thing to mention with relation to > >>> Reshuffle/RequiresStableinput and that is that our current implementation > >>> of RequiresStableInput can break without Reshuffle in some corner cases > >>> on most portable runners, at least with Java GreedyPipelineFuser, see > >>> [1]. The only way to workaround this currently is inserting Reshuffle (or > >>> any other fusion-break transform) directly before the stable DoFn > >>> (Reshuffle is handy, because it does not change the data). I think we > >>> should either somehow fix the issue [1] or include fusion break as a > >>> mandatory requirement for the new Redistribute transform as well (at > >>> least with some variant) or possibly add a new "hint" for non-optional > >>> fusion breaking. > >> > >> This is actually the bug we have wanted to fix for years - redistribute > >> has nothing to do with checkpointing or stable input and Reshuffle > >> incorrectly merges the two concepts. > >> > >> I agree that we couldn't make any immediate change that will break a > >> runner. I believe runners that depend on Reshuffle to provide stable input > >> will also provide stable input after GroupByKey. Since the SDK expansion > >> of Reshuffle will still contains a GBK, those runners functionality will > >> be unchanged. > >> > >> I don't yet have a firm opinion between the these approaches: > >> > >> 1. Adjust the Java SDK implementation of Reshuffle (and maybe other SDKs > >> if needed). With some flag so that users can use the old wrong behavior > >> for update compatibility. > >> 2. Add a Redistribute transform to the SDKs that has the right behavior > >> and leave Reshuffle as it is. > >> 1+2. Add the Redistribute transform but also make Reshuffle call it, so > >> Reshuffle also gets the new behavior, with the same flag so that users can > >> use the old wrong behavior for update compatibility. > >> > >> All of these will leave "Reshuffle for RequestStableInput" alone for now. > >> The options that include (2) will move us a little closer to migrating to > >> a "better" future state. > >> > >> I might have not expressed the right way. I understand that Reshuffle > >> having "stable input" functionality is non-portable side-effect. It would > >> be nice to get rid of it and my impression from this thread was that we > >> would try to deprecate Reshuffle and introduce Redistribute which will not > >> have such semantics. All of this is fine, problem is that we currently (is > >> some corner cases) rely on Reshuffle *even though* Pipeline uses > >> @RequiresStableInput. That is due to the fact that Reshuffle also ensures > >> fusion breaking. Fusing non-deterministic DoFn with stable DoFn breaks > >> the stable input property, because runners can ensure stability only at > >> the input of executable stage. Therefore we would either need to: > >> > >> a) define Redistribute as being an unconditional fusion break boundary, or > >> > >> b) define some other transform or hint to be able to enforce fusion > >> breaking > >> > >> Otherwise I'd be in favor of 2 and deprecation of Reshuffle. > > > > > > Just to be very clear - my goal right now is to just give Reshuffle a > > consistent semantics. Even for the old "stable input + redistribute" use of > > Reshuffle, the semantics are inconsistent/undefined and the Java SDK > > expansion is wrong. Changing things having to do with stable input are not > > part of what I am trying to change right now. But it is fine to do things > > that prepare for that. > > > > Kenn > > > >> > >> Jan > >> > >> > >> Any votes? Any other options? > >> > >> Kenn > >> > >>> Jan > >>> > >>> [1] https://github.com/apache/beam/issues/24655 > >>> > >>> On 10/5/23 21:01, Robert Burke wrote: > >>> > >>> Reshuffle/redistribute being a transform has the benefit of allowing > >>> existing runners that aren't updated to be aware of the new urns to rely > >>> on an SDK side implementation, which may be more expensive than what the > >>> runner is able to do with that awareness. > >>> > >>> Aka: it gives purpose to the fallback implementations. > >>> > >>> On Thu, Oct 5, 2023, 9:03 AM Kenneth Knowles <k...@apache.org> wrote: > >>>> > >>>> Another perspective, ignoring runners custom implementations and > >>>> non-Java SDKs could be that the semantics are perfectly well defined: it > >>>> is a composite and its semantics are defined by its implementation in > >>>> terms of primitives. It is just that this expansion is not what we want > >>>> so we should not use it (and also we shouldn't use "whatever the > >>>> implementation does" as a spec for anything we care about). > >>>> > >>>> On Thu, Oct 5, 2023 at 11:56 AM Kenneth Knowles <k...@apache.org> wrote: > >>>>> > >>>>> I totally agree. I am motivated right now by the fact that it is > >>>>> already used all over the place but with no consistent semantics. Maybe > >>>>> it is simpler to focus on just making the minimal change, which would > >>>>> basically be to update the expansion of the Reshuffle in the Java SDK. > >>>>> > >>>>> Kenn > >>>>> > >>>>> On Thu, Oct 5, 2023 at 11:39 AM John Casey <theotherj...@google.com> > >>>>> wrote: > >>>>>> > >>>>>> Given that this is a hint, I'm not sure redistribute should be a > >>>>>> PTransform as opposed to some other way to hint to a runner. > >>>>>> > >>>>>> I'm not sure of what the syntax of that would be, but a semantic no-op > >>>>>> transform that the runner may or may not do anything with is odd. > >>>>>> > >>>>>> On Thu, Oct 5, 2023 at 11:30 AM Kenneth Knowles <k...@apache.org> > >>>>>> wrote: > >>>>>>> > >>>>>>> So a high level suggestion from Robert that I want to highlight as a > >>>>>>> top-post: > >>>>>>> > >>>>>>> Instead of focusing on just fixing the SDKs and runners Reshuffle, > >>>>>>> this could be an opportunity to introduce Redistribute which was > >>>>>>> proposed in the long-ago thread. The semantics are identical but it > >>>>>>> is more clear that it only is a hint about redistributing data and > >>>>>>> there is no expectation of a checkpoint. > >>>>>>> > >>>>>>> This new name may also be an opportunity to maintain update > >>>>>>> compatibility (though this may actually be leaving unsafe code in > >>>>>>> user's hands) and/or separate @RequiresStableInput/checkpointing uses > >>>>>>> of Reshuffle from redistribution-only uses of Reshuffle. > >>>>>>> > >>>>>>> Any other thoughts on this one high level bit? > >>>>>>> > >>>>>>> Kenn > >>>>>>> > >>>>>>> On Thu, Oct 5, 2023 at 11:15 AM Kenneth Knowles <k...@apache.org> > >>>>>>> wrote: > >>>>>>>> > >>>>>>>> > >>>>>>>> On Wed, Oct 4, 2023 at 7:45 PM Robert Burke <lostl...@apache.org> > >>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>> LGTM. > >>>>>>>>> > >>>>>>>>> It looks the Go SDK already adheres to these semantics as well for > >>>>>>>>> the reference impl(well, reshuffle/redistribute_randomly, _by_key > >>>>>>>>> isn't implemented in the Go SDK, and only uses the existing > >>>>>>>>> unqualified reshuffle URN [0]. > >>>>>>>>> > >>>>>>>>> The original strategy, and then for every element, the original > >>>>>>>>> Window, TS, and Pane are all serialized, shuffled, and then > >>>>>>>>> deserialized downstream. > >>>>>>>>> > >>>>>>>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go#L65 > >>>>>>>>> > >>>>>>>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go#L145 > >>>>>>>>> > >>>>>>>>> Prism at the moment vaccuously implements reshuffle by omitting the > >>>>>>>>> node, and rewriting the inputs and outputs [1], as it's a local > >>>>>>>>> runner with single transform per bundle execution, but I was > >>>>>>>>> intending to make it a fusion break regardless. Ultimately prism's > >>>>>>>>> "test" variant will default to executing the SDKs dictated > >>>>>>>>> reference implementation for the composite(s), and any "fast" or > >>>>>>>>> "prod" variant would simply do the current implementation. > >>>>>>>> > >>>>>>>> > >>>>>>>> Very nice! > >>>>>>>> > >>>>>>>> And of course I should have linked out to the existing reshuffle URN > >>>>>>>> in the proto. > >>>>>>>> > >>>>>>>> Kenn > >>>>>>>> > >>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> Robert Burke > >>>>>>>>> Beam Go Busybody > >>>>>>>>> > >>>>>>>>> [0]: > >>>>>>>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L46C3-L46C50 > >>>>>>>>> [1]: > >>>>>>>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go#L82 > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On 2023/09/26 15:43:53 Kenneth Knowles wrote: > >>>>>>>>> > Hi everyone, > >>>>>>>>> > > >>>>>>>>> > Recently there was a bug [1] caused by discrepancies between two > >>>>>>>>> > of > >>>>>>>>> > Dataflow's reshuffle implementations. I think the reference > >>>>>>>>> > implementation > >>>>>>>>> > in the Java SDK [2] also does not match. This all led to > >>>>>>>>> > discussion on the > >>>>>>>>> > bug and the pull request [3] about what the actual semantics > >>>>>>>>> > should be. I > >>>>>>>>> > got it wrong, maybe multiple times. So I wrote up a very short > >>>>>>>>> > document to > >>>>>>>>> > finish the discussion: > >>>>>>>>> > > >>>>>>>>> > https://s.apache.org/beam-reshuffle > >>>>>>>>> > > >>>>>>>>> > This is also probably among the simplest imaginable use of > >>>>>>>>> > http://s.apache.org/ptransform-design-doc in case you want to see > >>>>>>>>> > kind of > >>>>>>>>> > how I intended it to be used. > >>>>>>>>> > > >>>>>>>>> > Kenn > >>>>>>>>> > > >>>>>>>>> > [1] https://github.com/apache/beam/issues/28219 > >>>>>>>>> > [2] > >>>>>>>>> > https://github.com/apache/beam/blob/d52b077ad505c8b50f10ec6a4eb83d385cdaf96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L84 > >>>>>>>>> > [3] https://github.com/apache/beam/pull/28272 > >>>>>>>>> >