On Fri, Oct 6, 2023 at 3:20 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi,
>
> there is also one other thing to mention with relation to
> Reshuffle/RequiresStableinput and that is that our current implementation
> of RequiresStableInput can break without Reshuffle in some corner cases on
> most portable runners, at least with Java GreedyPipelineFuser, see [1]. The
> only way to workaround this currently is inserting Reshuffle (or any other
> fusion-break transform) directly before the stable DoFn (Reshuffle is
> handy, because it does not change the data). I think we should either
> somehow fix the issue [1] or include fusion break as a mandatory
> requirement for the new Redistribute transform as well (at least with some
> variant) or possibly add a new "hint" for non-optional fusion breaking.
>
> This is actually the bug we have wanted to fix for years - redistribute
has nothing to do with checkpointing or stable input and Reshuffle
incorrectly merges the two concepts.

I agree that we couldn't make any immediate change that will break a
runner. I believe runners that depend on Reshuffle to provide stable input
will also provide stable input after GroupByKey. Since the SDK expansion of
Reshuffle will still contains a GBK, those runners functionality will be
unchanged.

I don't yet have a firm opinion between the these approaches:

1. Adjust the Java SDK implementation of Reshuffle (and maybe other SDKs if
needed). With some flag so that users can use the old wrong behavior for
update compatibility.
2. Add a Redistribute transform to the SDKs that has the right behavior and
leave Reshuffle as it is.
1+2. Add the Redistribute transform but also make Reshuffle call it, so
Reshuffle also gets the new behavior, with the same flag so that users can
use the old wrong behavior for update compatibility.

All of these will leave "Reshuffle for RequestStableInput" alone for now.
The options that include (2) will move us a little closer to migrating to a
"better" future state.

Any votes? Any other options?

Kenn

 Jan
>
> [1] https://github.com/apache/beam/issues/24655
> On 10/5/23 21:01, Robert Burke wrote:
>
> Reshuffle/redistribute being a transform has the benefit of allowing
> existing runners that aren't updated to be aware of the new urns to rely on
> an SDK side implementation, which may be more expensive than what the
> runner is able to do with that awareness.
>
> Aka: it gives purpose to the fallback implementations.
>
> On Thu, Oct 5, 2023, 9:03 AM Kenneth Knowles <k...@apache.org> wrote:
>
>> Another perspective, ignoring runners custom implementations and non-Java
>> SDKs could be that the semantics are perfectly well defined: it is a
>> composite and its semantics are defined by its implementation in terms of
>> primitives. It is just that this expansion is not what we want so we should
>> not use it (and also we shouldn't use "whatever the implementation does" as
>> a spec for anything we care about).
>>
>> On Thu, Oct 5, 2023 at 11:56 AM Kenneth Knowles <k...@apache.org> wrote:
>>
>>> I totally agree. I am motivated right now by the fact that it is already
>>> used all over the place but with no consistent semantics. Maybe it is
>>> simpler to focus on just making the minimal change, which would basically
>>> be to update the expansion of the Reshuffle in the Java SDK.
>>>
>>> Kenn
>>>
>>> On Thu, Oct 5, 2023 at 11:39 AM John Casey <theotherj...@google.com>
>>> wrote:
>>>
>>>> Given that this is a hint, I'm not sure redistribute should be a
>>>> PTransform as opposed to some other way to hint to a runner.
>>>>
>>>> I'm not sure of what the syntax of that would be, but a semantic no-op
>>>> transform that the runner may or may not do anything with is odd.
>>>>
>>>> On Thu, Oct 5, 2023 at 11:30 AM Kenneth Knowles <k...@apache.org>
>>>> wrote:
>>>>
>>>>> So a high level suggestion from Robert that I want to highlight as a
>>>>> top-post:
>>>>>
>>>>> Instead of focusing on just fixing the SDKs and runners Reshuffle,
>>>>> this could be an opportunity to introduce Redistribute which was proposed
>>>>> in the long-ago thread. The semantics are identical but it is more clear
>>>>> that it *only* is a hint about redistributing data and there is no
>>>>> expectation of a checkpoint.
>>>>>
>>>>> This new name may also be an opportunity to maintain update
>>>>> compatibility (though this may actually be leaving unsafe code in user's
>>>>> hands) and/or separate @RequiresStableInput/checkpointing uses of 
>>>>> Reshuffle
>>>>> from redistribution-only uses of Reshuffle.
>>>>>
>>>>> Any other thoughts on this one high level bit?
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Thu, Oct 5, 2023 at 11:15 AM Kenneth Knowles <k...@apache.org>
>>>>> wrote:
>>>>>
>>>>>>
>>>>>> On Wed, Oct 4, 2023 at 7:45 PM Robert Burke <lostl...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> LGTM.
>>>>>>>
>>>>>>> It looks the Go SDK already adheres to these semantics as well for
>>>>>>> the reference impl(well, reshuffle/redistribute_randomly, _by_key isn't
>>>>>>> implemented in the Go SDK, and only uses the existing unqualified 
>>>>>>> reshuffle
>>>>>>> URN [0].
>>>>>>>
>>>>>>> The original strategy, and then for every element, the original
>>>>>>> Window, TS, and Pane are all serialized, shuffled, and then deserialized
>>>>>>> downstream.
>>>>>>>
>>>>>>>
>>>>>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go#L65
>>>>>>>
>>>>>>>
>>>>>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go#L145
>>>>>>>
>>>>>>> Prism at the moment vaccuously implements reshuffle by omitting the
>>>>>>> node, and rewriting the inputs and outputs [1], as it's a local runner 
>>>>>>> with
>>>>>>> single transform per bundle execution, but I was intending to make it a
>>>>>>> fusion break regardless.  Ultimately prism's "test" variant will 
>>>>>>> default to
>>>>>>> executing the SDKs dictated reference implementation for the 
>>>>>>> composite(s),
>>>>>>> and any "fast" or "prod" variant would simply do the current 
>>>>>>> implementation.
>>>>>>>
>>>>>>
>>>>>> Very nice!
>>>>>>
>>>>>> And of course I should have linked out to the existing reshuffle URN
>>>>>> in the proto.
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> Robert Burke
>>>>>>> Beam Go Busybody
>>>>>>>
>>>>>>> [0]:
>>>>>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L46C3-L46C50
>>>>>>> [1]:
>>>>>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go#L82
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On 2023/09/26 15:43:53 Kenneth Knowles wrote:
>>>>>>> > Hi everyone,
>>>>>>> >
>>>>>>> > Recently there was a bug [1] caused by discrepancies between two of
>>>>>>> > Dataflow's reshuffle implementations. I think the reference
>>>>>>> implementation
>>>>>>> > in the Java SDK [2] also does not match. This all led to
>>>>>>> discussion on the
>>>>>>> > bug and the pull request [3] about what the actual semantics
>>>>>>> should be. I
>>>>>>> > got it wrong, maybe multiple times. So I wrote up a very short
>>>>>>> document to
>>>>>>> > finish the discussion:
>>>>>>> >
>>>>>>> >     https://s.apache.org/beam-reshuffle
>>>>>>> >
>>>>>>> > This is also probably among the simplest imaginable use of
>>>>>>> > http://s.apache.org/ptransform-design-doc in case you want to see
>>>>>>> kind of
>>>>>>> > how I intended it to be used.
>>>>>>> >
>>>>>>> > Kenn
>>>>>>> >
>>>>>>> > [1] https://github.com/apache/beam/issues/28219
>>>>>>> > [2]
>>>>>>> >
>>>>>>> https://github.com/apache/beam/blob/d52b077ad505c8b50f10ec6a4eb83d385cdaf96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L84
>>>>>>> > [3] https://github.com/apache/beam/pull/28272
>>>>>>> >
>>>>>>>
>>>>>>

Reply via email to