Hi,

I think there's been already said nearly everything in this thread, but ... it is time for Friday discussions. :)

Today I recalled of a discussion we've had long time ago, when we were designing Euphoria (btw, deprecating and removing it is still on my todo list, I should create a vote thread for that). We had 4 primitives:

 a) non-shuffle, stateless ~ stateless ParDo

 b) shuffle, stateful ~ stateful ParDo, with the ability (under the right circumstances,  i.e. defined event-time trigger, defined state merge function, ...) to be performed in a "combinable way".

 c) shuffle, stateless ~ Reshuffle

 d) non-shuffle, stateful - nope, makes no sense :) - part of the "combinable stateful shuffle operation"

 e) union ~ Flatten

Turns out you can build everything bottom up from these.

Now, the not-so-well defined semantics of Reshuffle (Redistribute) might arise from the fact it is not a primitive. Stateless shuffling of data is definitely a primitive of all runners.

Therefore here goes the question - should Redistribute be a primitive and not be built up from other transforms?

Best,

 Jan

On 10/6/23 21:12, Kenneth Knowles wrote:


On Fri, Oct 6, 2023 at 3:07 PM Jan Lukavský <je...@seznam.cz> wrote:


    On 10/6/23 15:11, Kenneth Knowles wrote:


    On Fri, Oct 6, 2023 at 3:20 AM Jan Lukavský <je...@seznam.cz> wrote:

        Hi,

        there is also one other thing to mention with relation to
        Reshuffle/RequiresStableinput and that is that our current
        implementation of RequiresStableInput can break without
        Reshuffle in some corner cases on most portable runners, at
        least with Java GreedyPipelineFuser, see [1]. The only way to
        workaround this currently is inserting Reshuffle (or any
        other fusion-break transform) directly before the stable DoFn
        (Reshuffle is handy, because it does not change the data). I
        think we should either somehow fix the issue [1] or include
        fusion break as a mandatory requirement for the new
        Redistribute transform as well (at least with some variant)
        or possibly add a new "hint" for non-optional fusion breaking.

    This is actually the bug we have wanted to fix for years -
    redistribute has nothing to do with checkpointing or stable input
    and Reshuffle incorrectly merges the two concepts.

    I agree that we couldn't make any immediate change that will
    break a runner. I believe runners that depend on Reshuffle to
    provide stable input will also provide stable input after
    GroupByKey. Since the SDK expansion of Reshuffle will still
    contains a GBK, those runners functionality will be unchanged.

    I don't yet have a firm opinion between the these approaches:

    1. Adjust the Java SDK implementation of Reshuffle (and maybe
    other SDKs if needed). With some flag so that users can use the
    old wrong behavior for update compatibility.
    2. Add a Redistribute transform to the SDKs that has the right
    behavior and leave Reshuffle as it is.
    1+2. Add the Redistribute transform but also make Reshuffle call
    it, so Reshuffle also gets the new behavior, with the same flag
    so that users can use the old wrong behavior for update
    compatibility.

    All of these will leave "Reshuffle for RequestStableInput" alone
    for now. The options that include (2) will move us a little
    closer to migrating to a "better" future state.

    I might have not expressed the right way. I understand that
    Reshuffle having "stable input" functionality is non-portable
    side-effect. It would be nice to get rid of it and my impression
    from this thread was that we would try to deprecate Reshuffle and
    introduce Redistribute which will not have such semantics. All of
    this is fine, problem is that we currently (is some corner cases)
    rely on Reshuffle *even though* Pipeline uses
    @RequiresStableInput. That is due to the fact that Reshuffle also
    ensures fusion breaking.  Fusing non-deterministic DoFn with
    stable DoFn breaks the stable input property, because runners can
    ensure stability only at the input of executable stage. Therefore
    we would either need to:

     a) define Redistribute as being an unconditional fusion break
    boundary, or

     b) define some other transform or hint to be able to enforce
    fusion breaking

    Otherwise I'd be in favor of 2 and deprecation of Reshuffle.


Just to be very clear - my goal right now is to just give Reshuffle a consistent semantics. Even for the old "stable input + redistribute" use of Reshuffle, the semantics are inconsistent/undefined and the Java SDK expansion is wrong. Changing things having to do with stable input are not part of what I am trying to change right now. But it is fine to do things that prepare for that.

Kenn

     Jan


    Any votes? Any other options?

    Kenn

         Jan

        [1] https://github.com/apache/beam/issues/24655

        On 10/5/23 21:01, Robert Burke wrote:
        Reshuffle/redistribute being a transform has the benefit of
        allowing existing runners that aren't updated to be aware of
        the new urns to rely on an SDK side implementation, which
        may be more expensive than what the runner is able to do
        with that awareness.

        Aka: it gives purpose to the fallback implementations.

        On Thu, Oct 5, 2023, 9:03 AM Kenneth Knowles
        <k...@apache.org> wrote:

            Another perspective, ignoring runners custom
            implementations and non-Java SDKs could be that the
            semantics are perfectly well defined: it is a composite
            and its semantics are defined by its implementation in
            terms of primitives. It is just that this expansion is
            not what we want so we should not use it (and also we
            shouldn't use "whatever the implementation does" as a
            spec for anything we care about).

            On Thu, Oct 5, 2023 at 11:56 AM Kenneth Knowles
            <k...@apache.org> wrote:

                I totally agree. I am motivated right now by the
                fact that it is already used all over the place but
                with no consistent semantics. Maybe it is simpler to
                focus on just making the minimal change, which would
                basically be to update the expansion of the
                Reshuffle in the Java SDK.

                Kenn

                On Thu, Oct 5, 2023 at 11:39 AM John Casey
                <theotherj...@google.com> wrote:

                    Given that this is a hint, I'm not sure
                    redistribute should be a PTransform as opposed
                    to some other way to hint to a runner.

                    I'm not sure of what the syntax of that would
                    be, but a semantic no-op transform that the
                    runner may or may not do anything with is odd.

                    On Thu, Oct 5, 2023 at 11:30 AM Kenneth Knowles
                    <k...@apache.org> wrote:

                        So a high level suggestion from Robert that
                        I want to highlight as a top-post:

                        Instead of focusing on just fixing the SDKs
                        and runners Reshuffle, this could be an
                        opportunity to introduce Redistribute which
                        was proposed in the long-ago thread. The
                        semantics are identical but it is more clear
                        that it /only/ is a hint about
                        redistributing data and there is no
                        expectation of a checkpoint.

                        This new name may also be an opportunity to
                        maintain update compatibility (though this
                        may actually be leaving unsafe code in
                        user's hands) and/or
                        separate @RequiresStableInput/checkpointing
                        uses of Reshuffle from redistribution-only
                        uses of Reshuffle.

                        Any other thoughts on this one high level bit?

                        Kenn

                        On Thu, Oct 5, 2023 at 11:15 AM Kenneth
                        Knowles <k...@apache.org> wrote:


                            On Wed, Oct 4, 2023 at 7:45 PM Robert
                            Burke <lostl...@apache.org> wrote:

                                LGTM.

                                It looks the Go SDK already adheres
                                to these semantics as well for the
                                reference impl(well,
                                reshuffle/redistribute_randomly,
                                _by_key isn't implemented in the Go
                                SDK, and only uses the existing
                                unqualified reshuffle URN [0].

                                The original strategy, and then for
                                every element, the original Window,
                                TS, and Pane are all serialized,
                                shuffled, and then deserialized
                                downstream.

                                
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go#L65

                                
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go#L145

                                Prism at the moment vaccuously
                                implements reshuffle by omitting the
                                node, and rewriting the inputs and
                                outputs [1], as it's a local runner
                                with single transform per bundle
                                execution, but I was intending to
                                make it a fusion break regardless.
                                Ultimately prism's "test" variant
                                will default to executing the SDKs
                                dictated reference implementation
                                for the composite(s), and any "fast"
                                or "prod" variant would simply do
                                the current implementation.


                            Very nice!

                            And of course I should have linked out
                            to the existing reshuffle URN in the proto.

                            Kenn


                                Robert Burke
                                Beam Go Busybody

                                [0]:
                                
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L46C3-L46C50
                                [1]:
                                
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go#L82



                                On 2023/09/26 15:43:53 Kenneth
                                Knowles wrote:
                                > Hi everyone,
                                >
                                > Recently there was a bug [1]
                                caused by discrepancies between two of
                                > Dataflow's reshuffle
                                implementations. I think the
                                reference implementation
                                > in the Java SDK [2] also does not
                                match. This all led to discussion on the
                                > bug and the pull request [3] about
                                what the actual semantics should be. I
                                > got it wrong, maybe multiple
                                times. So I wrote up a very short
                                document to
                                > finish the discussion:
                                >
                                > https://s.apache.org/beam-reshuffle
                                >
                                > This is also probably among the
                                simplest imaginable use of
                                >
                                http://s.apache.org/ptransform-design-doc
                                in case you want to see kind of
                                > how I intended it to be used.
                                >
                                > Kenn
                                >
                                > [1]
                                https://github.com/apache/beam/issues/28219
                                > [2]
                                >
                                
https://github.com/apache/beam/blob/d52b077ad505c8b50f10ec6a4eb83d385cdaf96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L84
                                > [3]
                                https://github.com/apache/beam/pull/28272
                                >

Reply via email to