On Fri, Oct 6, 2023 at 3:07 PM Jan Lukavský <je...@seznam.cz> wrote:
> > On 10/6/23 15:11, Kenneth Knowles wrote: > > > > On Fri, Oct 6, 2023 at 3:20 AM Jan Lukavský <je...@seznam.cz> wrote: > >> Hi, >> >> there is also one other thing to mention with relation to >> Reshuffle/RequiresStableinput and that is that our current implementation >> of RequiresStableInput can break without Reshuffle in some corner cases on >> most portable runners, at least with Java GreedyPipelineFuser, see [1]. The >> only way to workaround this currently is inserting Reshuffle (or any other >> fusion-break transform) directly before the stable DoFn (Reshuffle is >> handy, because it does not change the data). I think we should either >> somehow fix the issue [1] or include fusion break as a mandatory >> requirement for the new Redistribute transform as well (at least with some >> variant) or possibly add a new "hint" for non-optional fusion breaking. >> > This is actually the bug we have wanted to fix for years - redistribute > has nothing to do with checkpointing or stable input and Reshuffle > incorrectly merges the two concepts. > > I agree that we couldn't make any immediate change that will break a > runner. I believe runners that depend on Reshuffle to provide stable input > will also provide stable input after GroupByKey. Since the SDK expansion of > Reshuffle will still contains a GBK, those runners functionality will be > unchanged. > > I don't yet have a firm opinion between the these approaches: > > 1. Adjust the Java SDK implementation of Reshuffle (and maybe other SDKs > if needed). With some flag so that users can use the old wrong behavior for > update compatibility. > 2. Add a Redistribute transform to the SDKs that has the right behavior > and leave Reshuffle as it is. > 1+2. Add the Redistribute transform but also make Reshuffle call it, so > Reshuffle also gets the new behavior, with the same flag so that users can > use the old wrong behavior for update compatibility. > > All of these will leave "Reshuffle for RequestStableInput" alone for now. > The options that include (2) will move us a little closer to migrating to a > "better" future state. > > I might have not expressed the right way. I understand that Reshuffle > having "stable input" functionality is non-portable side-effect. It would > be nice to get rid of it and my impression from this thread was that we > would try to deprecate Reshuffle and introduce Redistribute which will not > have such semantics. All of this is fine, problem is that we currently (is > some corner cases) rely on Reshuffle *even though* Pipeline uses > @RequiresStableInput. That is due to the fact that Reshuffle also ensures > fusion breaking. Fusing non-deterministic DoFn with stable DoFn breaks the > stable input property, because runners can ensure stability only at the > input of executable stage. Therefore we would either need to: > > a) define Redistribute as being an unconditional fusion break boundary, or > > b) define some other transform or hint to be able to enforce fusion > breaking > > Otherwise I'd be in favor of 2 and deprecation of Reshuffle. > Just to be very clear - my goal right now is to just give Reshuffle a consistent semantics. Even for the old "stable input + redistribute" use of Reshuffle, the semantics are inconsistent/undefined and the Java SDK expansion is wrong. Changing things having to do with stable input are not part of what I am trying to change right now. But it is fine to do things that prepare for that. Kenn > Jan > > > Any votes? Any other options? > > Kenn > > Jan >> >> [1] https://github.com/apache/beam/issues/24655 >> On 10/5/23 21:01, Robert Burke wrote: >> >> Reshuffle/redistribute being a transform has the benefit of allowing >> existing runners that aren't updated to be aware of the new urns to rely on >> an SDK side implementation, which may be more expensive than what the >> runner is able to do with that awareness. >> >> Aka: it gives purpose to the fallback implementations. >> >> On Thu, Oct 5, 2023, 9:03 AM Kenneth Knowles <k...@apache.org> wrote: >> >>> Another perspective, ignoring runners custom implementations and >>> non-Java SDKs could be that the semantics are perfectly well defined: it is >>> a composite and its semantics are defined by its implementation in terms of >>> primitives. It is just that this expansion is not what we want so we should >>> not use it (and also we shouldn't use "whatever the implementation does" as >>> a spec for anything we care about). >>> >>> On Thu, Oct 5, 2023 at 11:56 AM Kenneth Knowles <k...@apache.org> wrote: >>> >>>> I totally agree. I am motivated right now by the fact that it is >>>> already used all over the place but with no consistent semantics. Maybe it >>>> is simpler to focus on just making the minimal change, which would >>>> basically be to update the expansion of the Reshuffle in the Java SDK. >>>> >>>> Kenn >>>> >>>> On Thu, Oct 5, 2023 at 11:39 AM John Casey <theotherj...@google.com> >>>> wrote: >>>> >>>>> Given that this is a hint, I'm not sure redistribute should be a >>>>> PTransform as opposed to some other way to hint to a runner. >>>>> >>>>> I'm not sure of what the syntax of that would be, but a semantic no-op >>>>> transform that the runner may or may not do anything with is odd. >>>>> >>>>> On Thu, Oct 5, 2023 at 11:30 AM Kenneth Knowles <k...@apache.org> >>>>> wrote: >>>>> >>>>>> So a high level suggestion from Robert that I want to highlight as a >>>>>> top-post: >>>>>> >>>>>> Instead of focusing on just fixing the SDKs and runners Reshuffle, >>>>>> this could be an opportunity to introduce Redistribute which was proposed >>>>>> in the long-ago thread. The semantics are identical but it is more clear >>>>>> that it *only* is a hint about redistributing data and there is no >>>>>> expectation of a checkpoint. >>>>>> >>>>>> This new name may also be an opportunity to maintain update >>>>>> compatibility (though this may actually be leaving unsafe code in user's >>>>>> hands) and/or separate @RequiresStableInput/checkpointing uses of >>>>>> Reshuffle >>>>>> from redistribution-only uses of Reshuffle. >>>>>> >>>>>> Any other thoughts on this one high level bit? >>>>>> >>>>>> Kenn >>>>>> >>>>>> On Thu, Oct 5, 2023 at 11:15 AM Kenneth Knowles <k...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> >>>>>>> On Wed, Oct 4, 2023 at 7:45 PM Robert Burke <lostl...@apache.org> >>>>>>> wrote: >>>>>>> >>>>>>>> LGTM. >>>>>>>> >>>>>>>> It looks the Go SDK already adheres to these semantics as well for >>>>>>>> the reference impl(well, reshuffle/redistribute_randomly, _by_key isn't >>>>>>>> implemented in the Go SDK, and only uses the existing unqualified >>>>>>>> reshuffle >>>>>>>> URN [0]. >>>>>>>> >>>>>>>> The original strategy, and then for every element, the original >>>>>>>> Window, TS, and Pane are all serialized, shuffled, and then >>>>>>>> deserialized >>>>>>>> downstream. >>>>>>>> >>>>>>>> >>>>>>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go#L65 >>>>>>>> >>>>>>>> >>>>>>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go#L145 >>>>>>>> >>>>>>>> Prism at the moment vaccuously implements reshuffle by omitting the >>>>>>>> node, and rewriting the inputs and outputs [1], as it's a local runner >>>>>>>> with >>>>>>>> single transform per bundle execution, but I was intending to make it a >>>>>>>> fusion break regardless. Ultimately prism's "test" variant will >>>>>>>> default to >>>>>>>> executing the SDKs dictated reference implementation for the >>>>>>>> composite(s), >>>>>>>> and any "fast" or "prod" variant would simply do the current >>>>>>>> implementation. >>>>>>>> >>>>>>> >>>>>>> Very nice! >>>>>>> >>>>>>> And of course I should have linked out to the existing reshuffle URN >>>>>>> in the proto. >>>>>>> >>>>>>> Kenn >>>>>>> >>>>>>> >>>>>>> >>>>>>>> >>>>>>>> Robert Burke >>>>>>>> Beam Go Busybody >>>>>>>> >>>>>>>> [0]: >>>>>>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L46C3-L46C50 >>>>>>>> [1]: >>>>>>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go#L82 >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On 2023/09/26 15:43:53 Kenneth Knowles wrote: >>>>>>>> > Hi everyone, >>>>>>>> > >>>>>>>> > Recently there was a bug [1] caused by discrepancies between two >>>>>>>> of >>>>>>>> > Dataflow's reshuffle implementations. I think the reference >>>>>>>> implementation >>>>>>>> > in the Java SDK [2] also does not match. This all led to >>>>>>>> discussion on the >>>>>>>> > bug and the pull request [3] about what the actual semantics >>>>>>>> should be. I >>>>>>>> > got it wrong, maybe multiple times. So I wrote up a very short >>>>>>>> document to >>>>>>>> > finish the discussion: >>>>>>>> > >>>>>>>> > https://s.apache.org/beam-reshuffle >>>>>>>> > >>>>>>>> > This is also probably among the simplest imaginable use of >>>>>>>> > http://s.apache.org/ptransform-design-doc in case you want to >>>>>>>> see kind of >>>>>>>> > how I intended it to be used. >>>>>>>> > >>>>>>>> > Kenn >>>>>>>> > >>>>>>>> > [1] https://github.com/apache/beam/issues/28219 >>>>>>>> > [2] >>>>>>>> > >>>>>>>> https://github.com/apache/beam/blob/d52b077ad505c8b50f10ec6a4eb83d385cdaf96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L84 >>>>>>>> > [3] https://github.com/apache/beam/pull/28272 >>>>>>>> > >>>>>>>> >>>>>>>