On Fri, Oct 20, 2023 at 3:29 AM Jan Lukavský <je...@seznam.cz> wrote:
> Yes, I'm aware that Beam is not defined in terms of runner convenience, > but in terms of data transform primitives. On the other hand - looking > at that from specific perspective - even though stateless shuffle does > not change the data itself, it changes distribution of data with > relation to partitioning, which is a property of the data as well. Not > only the data itself, but also any metadata about it might be viewed as > something that characterizes a PCollection and as something that can be > manipulated. Hence a transform can be given a proper data-related > semantics, even though it is a nop with regards to actual _contents_ of > a PCollection. Having said that, my point was that if Redistribute was a > defined fundamental primitive, it would immediately follow that it > should not be implemented as GBK-unGBK (at least not with extra care), > because it leads to problems with the stateful GBK introducing > unexpected side-effects, which from my understanding was the initial > problem that started this thread. > Agree with all this. It would be interesting, and I think I've seen a system that does it but I forget which one, to have a model whereby the metadata of collections is explicitly treated but also held independent of the contents of the collection. An interesting problem to allow authoring well-defined computations in such a model. This really is a Friday discussion :-) Kenn Best, > > Jan > > On 10/19/23 20:26, Kenneth Knowles wrote: > > Well I accidentally conflated "stateful" and "persisting", but anyhow > > yea we aren't targeting to have one Beam primitive for each thing that > > is probably a runner primitive. > > > > On Thu, Oct 19, 2023 at 2:25 PM Kenneth Knowles <k...@apache.org> wrote: > >> On Fri, Oct 13, 2023 at 12:51 PM Jan Lukavský <je...@seznam.cz> wrote: > >>> Hi, > >>> > >>> I think there's been already said nearly everything in this thread, > but ... it is time for Friday discussions. :) > >>> > >>> Today I recalled of a discussion we've had long time ago, when we were > designing Euphoria (btw, deprecating and removing it is still on my todo > list, I should create a vote thread for that). We had 4 primitives: > >>> > >>> a) non-shuffle, stateless ~ stateless ParDo > >>> > >>> b) shuffle, stateful ~ stateful ParDo, with the ability (under the > right circumstances, i.e. defined event-time trigger, defined state merge > function, ...) to be performed in a "combinable way". > >>> > >>> c) shuffle, stateless ~ Reshuffle > >>> > >>> d) non-shuffle, stateful - nope, makes no sense :) - part of the > "combinable stateful shuffle operation" > >>> > >>> e) union ~ Flatten > >>> > >>> Turns out you can build everything bottom up from these. > >>> > >>> Now, the not-so-well defined semantics of Reshuffle (Redistribute) > might arise from the fact it is not a primitive. Stateless shuffling of > data is definitely a primitive of all runners. > >> Not Dataflow :-) > >> > >> But more importantly, Beam primitives are deliberately chosen to be > >> fundamental data operations, not physical plan steps that a runner > >> might use. In other words, Beam is decidedly _not_ a library for > >> building composites that eventually are constructed from runner > >> primitives. It is more like SQL in that it is a library for building > >> composites that eventually are constructed from fundamental operations > >> on data, that every engine (like every RDBMS) will be able to > >> implement in its own way. > >> > >> Kenn > >> > >>> Therefore here goes the question - should Redistribute be a primitive > and not be built up from other transforms? > >>> > >>> Best, > >>> > >>> Jan > >>> > >>> On 10/6/23 21:12, Kenneth Knowles wrote: > >>> > >>> > >>> > >>> On Fri, Oct 6, 2023 at 3:07 PM Jan Lukavský <je...@seznam.cz> wrote: > >>>> > >>>> On 10/6/23 15:11, Kenneth Knowles wrote: > >>>> > >>>> > >>>> > >>>> On Fri, Oct 6, 2023 at 3:20 AM Jan Lukavský <je...@seznam.cz> wrote: > >>>>> Hi, > >>>>> > >>>>> there is also one other thing to mention with relation to > Reshuffle/RequiresStableinput and that is that our current implementation > of RequiresStableInput can break without Reshuffle in some corner cases on > most portable runners, at least with Java GreedyPipelineFuser, see [1]. The > only way to workaround this currently is inserting Reshuffle (or any other > fusion-break transform) directly before the stable DoFn (Reshuffle is > handy, because it does not change the data). I think we should either > somehow fix the issue [1] or include fusion break as a mandatory > requirement for the new Redistribute transform as well (at least with some > variant) or possibly add a new "hint" for non-optional fusion breaking. > >>>> This is actually the bug we have wanted to fix for years - > redistribute has nothing to do with checkpointing or stable input and > Reshuffle incorrectly merges the two concepts. > >>>> > >>>> I agree that we couldn't make any immediate change that will break a > runner. I believe runners that depend on Reshuffle to provide stable input > will also provide stable input after GroupByKey. Since the SDK expansion of > Reshuffle will still contains a GBK, those runners functionality will be > unchanged. > >>>> > >>>> I don't yet have a firm opinion between the these approaches: > >>>> > >>>> 1. Adjust the Java SDK implementation of Reshuffle (and maybe other > SDKs if needed). With some flag so that users can use the old wrong > behavior for update compatibility. > >>>> 2. Add a Redistribute transform to the SDKs that has the right > behavior and leave Reshuffle as it is. > >>>> 1+2. Add the Redistribute transform but also make Reshuffle call it, > so Reshuffle also gets the new behavior, with the same flag so that users > can use the old wrong behavior for update compatibility. > >>>> > >>>> All of these will leave "Reshuffle for RequestStableInput" alone for > now. The options that include (2) will move us a little closer to migrating > to a "better" future state. > >>>> > >>>> I might have not expressed the right way. I understand that Reshuffle > having "stable input" functionality is non-portable side-effect. It would > be nice to get rid of it and my impression from this thread was that we > would try to deprecate Reshuffle and introduce Redistribute which will not > have such semantics. All of this is fine, problem is that we currently (is > some corner cases) rely on Reshuffle *even though* Pipeline uses > @RequiresStableInput. That is due to the fact that Reshuffle also ensures > fusion breaking. Fusing non-deterministic DoFn with stable DoFn breaks the > stable input property, because runners can ensure stability only at the > input of executable stage. Therefore we would either need to: > >>>> > >>>> a) define Redistribute as being an unconditional fusion break > boundary, or > >>>> > >>>> b) define some other transform or hint to be able to enforce fusion > breaking > >>>> > >>>> Otherwise I'd be in favor of 2 and deprecation of Reshuffle. > >>> > >>> Just to be very clear - my goal right now is to just give Reshuffle a > consistent semantics. Even for the old "stable input + redistribute" use of > Reshuffle, the semantics are inconsistent/undefined and the Java SDK > expansion is wrong. Changing things having to do with stable input are not > part of what I am trying to change right now. But it is fine to do things > that prepare for that. > >>> > >>> Kenn > >>> > >>>> Jan > >>>> > >>>> > >>>> Any votes? Any other options? > >>>> > >>>> Kenn > >>>> > >>>>> Jan > >>>>> > >>>>> [1] https://github.com/apache/beam/issues/24655 > >>>>> > >>>>> On 10/5/23 21:01, Robert Burke wrote: > >>>>> > >>>>> Reshuffle/redistribute being a transform has the benefit of allowing > existing runners that aren't updated to be aware of the new urns to rely on > an SDK side implementation, which may be more expensive than what the > runner is able to do with that awareness. > >>>>> > >>>>> Aka: it gives purpose to the fallback implementations. > >>>>> > >>>>> On Thu, Oct 5, 2023, 9:03 AM Kenneth Knowles <k...@apache.org> > wrote: > >>>>>> Another perspective, ignoring runners custom implementations and > non-Java SDKs could be that the semantics are perfectly well defined: it is > a composite and its semantics are defined by its implementation in terms of > primitives. It is just that this expansion is not what we want so we should > not use it (and also we shouldn't use "whatever the implementation does" as > a spec for anything we care about). > >>>>>> > >>>>>> On Thu, Oct 5, 2023 at 11:56 AM Kenneth Knowles <k...@apache.org> > wrote: > >>>>>>> I totally agree. I am motivated right now by the fact that it is > already used all over the place but with no consistent semantics. Maybe it > is simpler to focus on just making the minimal change, which would > basically be to update the expansion of the Reshuffle in the Java SDK. > >>>>>>> > >>>>>>> Kenn > >>>>>>> > >>>>>>> On Thu, Oct 5, 2023 at 11:39 AM John Casey < > theotherj...@google.com> wrote: > >>>>>>>> Given that this is a hint, I'm not sure redistribute should be a > PTransform as opposed to some other way to hint to a runner. > >>>>>>>> > >>>>>>>> I'm not sure of what the syntax of that would be, but a semantic > no-op transform that the runner may or may not do anything with is odd. > >>>>>>>> > >>>>>>>> On Thu, Oct 5, 2023 at 11:30 AM Kenneth Knowles <k...@apache.org> > wrote: > >>>>>>>>> So a high level suggestion from Robert that I want to highlight > as a top-post: > >>>>>>>>> > >>>>>>>>> Instead of focusing on just fixing the SDKs and runners > Reshuffle, this could be an opportunity to introduce Redistribute which was > proposed in the long-ago thread. The semantics are identical but it is more > clear that it only is a hint about redistributing data and there is no > expectation of a checkpoint. > >>>>>>>>> > >>>>>>>>> This new name may also be an opportunity to maintain update > compatibility (though this may actually be leaving unsafe code in user's > hands) and/or separate @RequiresStableInput/checkpointing uses of Reshuffle > from redistribution-only uses of Reshuffle. > >>>>>>>>> > >>>>>>>>> Any other thoughts on this one high level bit? > >>>>>>>>> > >>>>>>>>> Kenn > >>>>>>>>> > >>>>>>>>> On Thu, Oct 5, 2023 at 11:15 AM Kenneth Knowles <k...@apache.org> > wrote: > >>>>>>>>>> > >>>>>>>>>> On Wed, Oct 4, 2023 at 7:45 PM Robert Burke < > lostl...@apache.org> wrote: > >>>>>>>>>>> LGTM. > >>>>>>>>>>> > >>>>>>>>>>> It looks the Go SDK already adheres to these semantics as well > for the reference impl(well, reshuffle/redistribute_randomly, _by_key isn't > implemented in the Go SDK, and only uses the existing unqualified reshuffle > URN [0]. > >>>>>>>>>>> > >>>>>>>>>>> The original strategy, and then for every element, the > original Window, TS, and Pane are all serialized, shuffled, and then > deserialized downstream. > >>>>>>>>>>> > >>>>>>>>>>> > https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go#L65 > >>>>>>>>>>> > >>>>>>>>>>> > https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go#L145 > >>>>>>>>>>> > >>>>>>>>>>> Prism at the moment vaccuously implements reshuffle by > omitting the node, and rewriting the inputs and outputs [1], as it's a > local runner with single transform per bundle execution, but I was > intending to make it a fusion break regardless. Ultimately prism's "test" > variant will default to executing the SDKs dictated reference > implementation for the composite(s), and any "fast" or "prod" variant would > simply do the current implementation. > >>>>>>>>>> > >>>>>>>>>> Very nice! > >>>>>>>>>> > >>>>>>>>>> And of course I should have linked out to the existing > reshuffle URN in the proto. > >>>>>>>>>> > >>>>>>>>>> Kenn > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Robert Burke > >>>>>>>>>>> Beam Go Busybody > >>>>>>>>>>> > >>>>>>>>>>> [0]: > https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L46C3-L46C50 > >>>>>>>>>>> [1]: > https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go#L82 > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On 2023/09/26 15:43:53 Kenneth Knowles wrote: > >>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>> > >>>>>>>>>>>> Recently there was a bug [1] caused by discrepancies between > two of > >>>>>>>>>>>> Dataflow's reshuffle implementations. I think the reference > implementation > >>>>>>>>>>>> in the Java SDK [2] also does not match. This all led to > discussion on the > >>>>>>>>>>>> bug and the pull request [3] about what the actual semantics > should be. I > >>>>>>>>>>>> got it wrong, maybe multiple times. So I wrote up a very > short document to > >>>>>>>>>>>> finish the discussion: > >>>>>>>>>>>> > >>>>>>>>>>>> https://s.apache.org/beam-reshuffle > >>>>>>>>>>>> > >>>>>>>>>>>> This is also probably among the simplest imaginable use of > >>>>>>>>>>>> http://s.apache.org/ptransform-design-doc in case you want > to see kind of > >>>>>>>>>>>> how I intended it to be used. > >>>>>>>>>>>> > >>>>>>>>>>>> Kenn > >>>>>>>>>>>> > >>>>>>>>>>>> [1] https://github.com/apache/beam/issues/28219 > >>>>>>>>>>>> [2] > >>>>>>>>>>>> > https://github.com/apache/beam/blob/d52b077ad505c8b50f10ec6a4eb83d385cdaf96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L84 > >>>>>>>>>>>> [3] https://github.com/apache/beam/pull/28272 > >>>>>>>>>>>> >