On Fri, Oct 20, 2023 at 3:29 AM Jan Lukavský <je...@seznam.cz> wrote:

> Yes, I'm aware that Beam is not defined in terms of runner convenience,
> but in terms of data transform primitives. On the other hand - looking
> at that from specific perspective - even though stateless shuffle does
> not change the data itself, it changes distribution of data with
> relation to partitioning, which is a property of the data as well. Not
> only the data itself, but also any metadata about it might be viewed as
> something that characterizes a PCollection and as something that can be
> manipulated. Hence a transform can be given a proper data-related
> semantics, even though it is a nop with regards to actual _contents_ of
> a PCollection. Having said that, my point was that if Redistribute was a
> defined fundamental primitive, it would immediately follow that it
> should not be implemented as GBK-unGBK (at least not with extra care),
> because it leads to problems with the stateful GBK introducing
> unexpected side-effects, which from my understanding was the initial
> problem that started this thread.
>

Agree with all this. It would be interesting, and I think I've seen a
system that does it but I forget which one, to have a model whereby the
metadata of collections is explicitly treated but also held independent of
the contents of the collection. An interesting problem to allow authoring
well-defined computations in such a model. This really is a Friday
discussion :-)

Kenn


Best,
>
>   Jan
>
> On 10/19/23 20:26, Kenneth Knowles wrote:
> > Well I accidentally conflated "stateful" and "persisting", but anyhow
> > yea we aren't targeting to have one Beam primitive for each thing that
> > is probably a runner primitive.
> >
> > On Thu, Oct 19, 2023 at 2:25 PM Kenneth Knowles <k...@apache.org> wrote:
> >> On Fri, Oct 13, 2023 at 12:51 PM Jan Lukavský <je...@seznam.cz> wrote:
> >>> Hi,
> >>>
> >>> I think there's been already said nearly everything in this thread,
> but ... it is time for Friday discussions. :)
> >>>
> >>> Today I recalled of a discussion we've had long time ago, when we were
> designing Euphoria (btw, deprecating and removing it is still on my todo
> list, I should create a vote thread for that). We had 4 primitives:
> >>>
> >>>   a) non-shuffle, stateless ~ stateless ParDo
> >>>
> >>>   b) shuffle, stateful ~ stateful ParDo, with the ability (under the
> right circumstances,  i.e. defined event-time trigger, defined state merge
> function, ...) to be performed in a "combinable way".
> >>>
> >>>   c) shuffle, stateless ~ Reshuffle
> >>>
> >>>   d) non-shuffle, stateful - nope, makes no sense :) - part of the
> "combinable stateful shuffle operation"
> >>>
> >>>   e) union ~ Flatten
> >>>
> >>> Turns out you can build everything bottom up from these.
> >>>
> >>> Now, the not-so-well defined semantics of Reshuffle (Redistribute)
> might arise from the fact it is not a primitive. Stateless shuffling of
> data is definitely a primitive of all runners.
> >> Not Dataflow :-)
> >>
> >> But more importantly, Beam primitives are deliberately chosen to be
> >> fundamental data operations, not physical plan steps that a runner
> >> might use. In other words, Beam is decidedly _not_ a library for
> >> building composites that eventually are constructed from runner
> >> primitives. It is more like SQL in that it is a library for building
> >> composites that eventually are constructed from fundamental operations
> >> on data, that every engine (like every RDBMS) will be able to
> >> implement in its own way.
> >>
> >> Kenn
> >>
> >>> Therefore here goes the question - should Redistribute be a primitive
> and not be built up from other transforms?
> >>>
> >>> Best,
> >>>
> >>>   Jan
> >>>
> >>> On 10/6/23 21:12, Kenneth Knowles wrote:
> >>>
> >>>
> >>>
> >>> On Fri, Oct 6, 2023 at 3:07 PM Jan Lukavský <je...@seznam.cz> wrote:
> >>>>
> >>>> On 10/6/23 15:11, Kenneth Knowles wrote:
> >>>>
> >>>>
> >>>>
> >>>> On Fri, Oct 6, 2023 at 3:20 AM Jan Lukavský <je...@seznam.cz> wrote:
> >>>>> Hi,
> >>>>>
> >>>>> there is also one other thing to mention with relation to
> Reshuffle/RequiresStableinput and that is that our current implementation
> of RequiresStableInput can break without Reshuffle in some corner cases on
> most portable runners, at least with Java GreedyPipelineFuser, see [1]. The
> only way to workaround this currently is inserting Reshuffle (or any other
> fusion-break transform) directly before the stable DoFn (Reshuffle is
> handy, because it does not change the data). I think we should either
> somehow fix the issue [1] or include fusion break as a mandatory
> requirement for the new Redistribute transform as well (at least with some
> variant) or possibly add a new "hint" for non-optional fusion breaking.
> >>>> This is actually the bug we have wanted to fix for years -
> redistribute has nothing to do with checkpointing or stable input and
> Reshuffle incorrectly merges the two concepts.
> >>>>
> >>>> I agree that we couldn't make any immediate change that will break a
> runner. I believe runners that depend on Reshuffle to provide stable input
> will also provide stable input after GroupByKey. Since the SDK expansion of
> Reshuffle will still contains a GBK, those runners functionality will be
> unchanged.
> >>>>
> >>>> I don't yet have a firm opinion between the these approaches:
> >>>>
> >>>> 1. Adjust the Java SDK implementation of Reshuffle (and maybe other
> SDKs if needed). With some flag so that users can use the old wrong
> behavior for update compatibility.
> >>>> 2. Add a Redistribute transform to the SDKs that has the right
> behavior and leave Reshuffle as it is.
> >>>> 1+2. Add the Redistribute transform but also make Reshuffle call it,
> so Reshuffle also gets the new behavior, with the same flag so that users
> can use the old wrong behavior for update compatibility.
> >>>>
> >>>> All of these will leave "Reshuffle for RequestStableInput" alone for
> now. The options that include (2) will move us a little closer to migrating
> to a "better" future state.
> >>>>
> >>>> I might have not expressed the right way. I understand that Reshuffle
> having "stable input" functionality is non-portable side-effect. It would
> be nice to get rid of it and my impression from this thread was that we
> would try to deprecate Reshuffle and introduce Redistribute which will not
> have such semantics. All of this is fine, problem is that we currently (is
> some corner cases) rely on Reshuffle *even though* Pipeline uses
> @RequiresStableInput. That is due to the fact that Reshuffle also ensures
> fusion breaking.  Fusing non-deterministic DoFn with stable DoFn breaks the
> stable input property, because runners can ensure stability only at the
> input of executable stage. Therefore we would either need to:
> >>>>
> >>>>   a) define Redistribute as being an unconditional fusion break
> boundary, or
> >>>>
> >>>>   b) define some other transform or hint to be able to enforce fusion
> breaking
> >>>>
> >>>> Otherwise I'd be in favor of 2 and deprecation of Reshuffle.
> >>>
> >>> Just to be very clear - my goal right now is to just give Reshuffle a
> consistent semantics. Even for the old "stable input + redistribute" use of
> Reshuffle, the semantics are inconsistent/undefined and the Java SDK
> expansion is wrong. Changing things having to do with stable input are not
> part of what I am trying to change right now. But it is fine to do things
> that prepare for that.
> >>>
> >>> Kenn
> >>>
> >>>>   Jan
> >>>>
> >>>>
> >>>> Any votes? Any other options?
> >>>>
> >>>> Kenn
> >>>>
> >>>>>   Jan
> >>>>>
> >>>>> [1] https://github.com/apache/beam/issues/24655
> >>>>>
> >>>>> On 10/5/23 21:01, Robert Burke wrote:
> >>>>>
> >>>>> Reshuffle/redistribute being a transform has the benefit of allowing
> existing runners that aren't updated to be aware of the new urns to rely on
> an SDK side implementation, which may be more expensive than what the
> runner is able to do with that awareness.
> >>>>>
> >>>>> Aka: it gives purpose to the fallback implementations.
> >>>>>
> >>>>> On Thu, Oct 5, 2023, 9:03 AM Kenneth Knowles <k...@apache.org>
> wrote:
> >>>>>> Another perspective, ignoring runners custom implementations and
> non-Java SDKs could be that the semantics are perfectly well defined: it is
> a composite and its semantics are defined by its implementation in terms of
> primitives. It is just that this expansion is not what we want so we should
> not use it (and also we shouldn't use "whatever the implementation does" as
> a spec for anything we care about).
> >>>>>>
> >>>>>> On Thu, Oct 5, 2023 at 11:56 AM Kenneth Knowles <k...@apache.org>
> wrote:
> >>>>>>> I totally agree. I am motivated right now by the fact that it is
> already used all over the place but with no consistent semantics. Maybe it
> is simpler to focus on just making the minimal change, which would
> basically be to update the expansion of the Reshuffle in the Java SDK.
> >>>>>>>
> >>>>>>> Kenn
> >>>>>>>
> >>>>>>> On Thu, Oct 5, 2023 at 11:39 AM John Casey <
> theotherj...@google.com> wrote:
> >>>>>>>> Given that this is a hint, I'm not sure redistribute should be a
> PTransform as opposed to some other way to hint to a runner.
> >>>>>>>>
> >>>>>>>> I'm not sure of what the syntax of that would be, but a semantic
> no-op transform that the runner may or may not do anything with is odd.
> >>>>>>>>
> >>>>>>>> On Thu, Oct 5, 2023 at 11:30 AM Kenneth Knowles <k...@apache.org>
> wrote:
> >>>>>>>>> So a high level suggestion from Robert that I want to highlight
> as a top-post:
> >>>>>>>>>
> >>>>>>>>> Instead of focusing on just fixing the SDKs and runners
> Reshuffle, this could be an opportunity to introduce Redistribute which was
> proposed in the long-ago thread. The semantics are identical but it is more
> clear that it only is a hint about redistributing data and there is no
> expectation of a checkpoint.
> >>>>>>>>>
> >>>>>>>>> This new name may also be an opportunity to maintain update
> compatibility (though this may actually be leaving unsafe code in user's
> hands) and/or separate @RequiresStableInput/checkpointing uses of Reshuffle
> from redistribution-only uses of Reshuffle.
> >>>>>>>>>
> >>>>>>>>> Any other thoughts on this one high level bit?
> >>>>>>>>>
> >>>>>>>>> Kenn
> >>>>>>>>>
> >>>>>>>>> On Thu, Oct 5, 2023 at 11:15 AM Kenneth Knowles <k...@apache.org>
> wrote:
> >>>>>>>>>>
> >>>>>>>>>> On Wed, Oct 4, 2023 at 7:45 PM Robert Burke <
> lostl...@apache.org> wrote:
> >>>>>>>>>>> LGTM.
> >>>>>>>>>>>
> >>>>>>>>>>> It looks the Go SDK already adheres to these semantics as well
> for the reference impl(well, reshuffle/redistribute_randomly, _by_key isn't
> implemented in the Go SDK, and only uses the existing unqualified reshuffle
> URN [0].
> >>>>>>>>>>>
> >>>>>>>>>>> The original strategy, and then for every element, the
> original Window, TS, and Pane are all serialized, shuffled, and then
> deserialized downstream.
> >>>>>>>>>>>
> >>>>>>>>>>>
> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go#L65
> >>>>>>>>>>>
> >>>>>>>>>>>
> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go#L145
> >>>>>>>>>>>
> >>>>>>>>>>> Prism at the moment vaccuously implements reshuffle by
> omitting the node, and rewriting the inputs and outputs [1], as it's a
> local runner with single transform per bundle execution, but I was
> intending to make it a fusion break regardless.  Ultimately prism's "test"
> variant will default to executing the SDKs dictated reference
> implementation for the composite(s), and any "fast" or "prod" variant would
> simply do the current implementation.
> >>>>>>>>>>
> >>>>>>>>>> Very nice!
> >>>>>>>>>>
> >>>>>>>>>> And of course I should have linked out to the existing
> reshuffle URN in the proto.
> >>>>>>>>>>
> >>>>>>>>>> Kenn
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Robert Burke
> >>>>>>>>>>> Beam Go Busybody
> >>>>>>>>>>>
> >>>>>>>>>>> [0]:
> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L46C3-L46C50
> >>>>>>>>>>> [1]:
> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go#L82
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On 2023/09/26 15:43:53 Kenneth Knowles wrote:
> >>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Recently there was a bug [1] caused by discrepancies between
> two of
> >>>>>>>>>>>> Dataflow's reshuffle implementations. I think the reference
> implementation
> >>>>>>>>>>>> in the Java SDK [2] also does not match. This all led to
> discussion on the
> >>>>>>>>>>>> bug and the pull request [3] about what the actual semantics
> should be. I
> >>>>>>>>>>>> got it wrong, maybe multiple times. So I wrote up a very
> short document to
> >>>>>>>>>>>> finish the discussion:
> >>>>>>>>>>>>
> >>>>>>>>>>>>      https://s.apache.org/beam-reshuffle
> >>>>>>>>>>>>
> >>>>>>>>>>>> This is also probably among the simplest imaginable use of
> >>>>>>>>>>>> http://s.apache.org/ptransform-design-doc in case you want
> to see kind of
> >>>>>>>>>>>> how I intended it to be used.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Kenn
> >>>>>>>>>>>>
> >>>>>>>>>>>> [1] https://github.com/apache/beam/issues/28219
> >>>>>>>>>>>> [2]
> >>>>>>>>>>>>
> https://github.com/apache/beam/blob/d52b077ad505c8b50f10ec6a4eb83d385cdaf96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L84
> >>>>>>>>>>>> [3] https://github.com/apache/beam/pull/28272
> >>>>>>>>>>>>
>

Reply via email to