On 10/6/23 15:11, Kenneth Knowles wrote:


On Fri, Oct 6, 2023 at 3:20 AM Jan Lukavský <je...@seznam.cz> wrote:

    Hi,

    there is also one other thing to mention with relation to
    Reshuffle/RequiresStableinput and that is that our current
    implementation of RequiresStableInput can break without Reshuffle
    in some corner cases on most portable runners, at least with Java
    GreedyPipelineFuser, see [1]. The only way to workaround this
    currently is inserting Reshuffle (or any other fusion-break
    transform) directly before the stable DoFn (Reshuffle is handy,
    because it does not change the data). I think we should either
    somehow fix the issue [1] or include fusion break as a mandatory
    requirement for the new Redistribute transform as well (at least
    with some variant) or possibly add a new "hint" for non-optional
    fusion breaking.

This is actually the bug we have wanted to fix for years - redistribute has nothing to do with checkpointing or stable input and Reshuffle incorrectly merges the two concepts.

I agree that we couldn't make any immediate change that will break a runner. I believe runners that depend on Reshuffle to provide stable input will also provide stable input after GroupByKey. Since the SDK expansion of Reshuffle will still contains a GBK, those runners functionality will be unchanged.

I don't yet have a firm opinion between the these approaches:

1. Adjust the Java SDK implementation of Reshuffle (and maybe other SDKs if needed). With some flag so that users can use the old wrong behavior for update compatibility. 2. Add a Redistribute transform to the SDKs that has the right behavior and leave Reshuffle as it is. 1+2. Add the Redistribute transform but also make Reshuffle call it, so Reshuffle also gets the new behavior, with the same flag so that users can use the old wrong behavior for update compatibility.

All of these will leave "Reshuffle for RequestStableInput" alone for now. The options that include (2) will move us a little closer to migrating to a "better" future state.

I might have not expressed the right way. I understand that Reshuffle having "stable input" functionality is non-portable side-effect. It would be nice to get rid of it and my impression from this thread was that we would try to deprecate Reshuffle and introduce Redistribute which will not have such semantics. All of this is fine, problem is that we currently (is some corner cases) rely on Reshuffle *even though* Pipeline uses @RequiresStableInput. That is due to the fact that Reshuffle also ensures fusion breaking.  Fusing non-deterministic DoFn with stable DoFn breaks the stable input property, because runners can ensure stability only at the input of executable stage. Therefore we would either need to:

 a) define Redistribute as being an unconditional fusion break boundary, or

 b) define some other transform or hint to be able to enforce fusion breaking

Otherwise I'd be in favor of 2 and deprecation of Reshuffle.

 Jan


Any votes? Any other options?

Kenn

     Jan

    [1] https://github.com/apache/beam/issues/24655

    On 10/5/23 21:01, Robert Burke wrote:
    Reshuffle/redistribute being a transform has the benefit of
    allowing existing runners that aren't updated to be aware of the
    new urns to rely on an SDK side implementation, which may be more
    expensive than what the runner is able to do with that awareness.

    Aka: it gives purpose to the fallback implementations.

    On Thu, Oct 5, 2023, 9:03 AM Kenneth Knowles <k...@apache.org> wrote:

        Another perspective, ignoring runners custom implementations
        and non-Java SDKs could be that the semantics are perfectly
        well defined: it is a composite and its semantics are defined
        by its implementation in terms of primitives. It is just that
        this expansion is not what we want so we should not use it
        (and also we shouldn't use "whatever the implementation does"
        as a spec for anything we care about).

        On Thu, Oct 5, 2023 at 11:56 AM Kenneth Knowles
        <k...@apache.org> wrote:

            I totally agree. I am motivated right now by the fact
            that it is already used all over the place but with no
            consistent semantics. Maybe it is simpler to focus on
            just making the minimal change, which would basically be
            to update the expansion of the Reshuffle in the Java SDK.

            Kenn

            On Thu, Oct 5, 2023 at 11:39 AM John Casey
            <theotherj...@google.com> wrote:

                Given that this is a hint, I'm not sure redistribute
                should be a PTransform as opposed to some other way
                to hint to a runner.

                I'm not sure of what the syntax of that would be, but
                a semantic no-op transform that the runner may or may
                not do anything with is odd.

                On Thu, Oct 5, 2023 at 11:30 AM Kenneth Knowles
                <k...@apache.org> wrote:

                    So a high level suggestion from Robert that I
                    want to highlight as a top-post:

                    Instead of focusing on just fixing the SDKs and
                    runners Reshuffle, this could be an opportunity
                    to introduce Redistribute which was proposed in
                    the long-ago thread. The semantics are identical
                    but it is more clear that it /only/ is a hint
                    about redistributing data and there is no
                    expectation of a checkpoint.

                    This new name may also be an opportunity to
                    maintain update compatibility (though this may
                    actually be leaving unsafe code in user's hands)
                    and/or
                    separate @RequiresStableInput/checkpointing uses
                    of Reshuffle from redistribution-only uses of
                    Reshuffle.

                    Any other thoughts on this one high level bit?

                    Kenn

                    On Thu, Oct 5, 2023 at 11:15 AM Kenneth Knowles
                    <k...@apache.org> wrote:


                        On Wed, Oct 4, 2023 at 7:45 PM Robert Burke
                        <lostl...@apache.org> wrote:

                            LGTM.

                            It looks the Go SDK already adheres to
                            these semantics as well for the reference
                            impl(well,
                            reshuffle/redistribute_randomly, _by_key
                            isn't implemented in the Go SDK, and only
                            uses the existing unqualified reshuffle
                            URN [0].

                            The original strategy, and then for every
                            element, the original Window, TS, and
                            Pane are all serialized, shuffled, and
                            then deserialized downstream.

                            
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go#L65

                            
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go#L145

                            Prism at the moment vaccuously implements
                            reshuffle by omitting the node, and
                            rewriting the inputs and outputs [1], as
                            it's a local runner with single transform
                            per bundle execution, but I was intending
                            to make it a fusion break regardless.
                            Ultimately prism's "test" variant will
                            default to executing the SDKs dictated
                            reference implementation for the
                            composite(s), and any "fast" or "prod"
                            variant would simply do the current
                            implementation.


                        Very nice!

                        And of course I should have linked out to the
                        existing reshuffle URN in the proto.

                        Kenn


                            Robert Burke
                            Beam Go Busybody

                            [0]:
                            
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L46C3-L46C50
                            [1]:
                            
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go#L82



                            On 2023/09/26 15:43:53 Kenneth Knowles wrote:
                            > Hi everyone,
                            >
                            > Recently there was a bug [1] caused by
                            discrepancies between two of
                            > Dataflow's reshuffle implementations. I
                            think the reference implementation
                            > in the Java SDK [2] also does not
                            match. This all led to discussion on the
                            > bug and the pull request [3] about what
                            the actual semantics should be. I
                            > got it wrong, maybe multiple times. So
                            I wrote up a very short document to
                            > finish the discussion:
                            >
                            > https://s.apache.org/beam-reshuffle
                            >
                            > This is also probably among the
                            simplest imaginable use of
                            >
                            http://s.apache.org/ptransform-design-doc
                            in case you want to see kind of
                            > how I intended it to be used.
                            >
                            > Kenn
                            >
                            > [1]
                            https://github.com/apache/beam/issues/28219
                            > [2]
                            >
                            
https://github.com/apache/beam/blob/d52b077ad505c8b50f10ec6a4eb83d385cdaf96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L84
                            > [3]
                            https://github.com/apache/beam/pull/28272
                            >

Reply via email to