On Fri, Oct 6, 2023 at 3:07 PM Jan Lukavský <je...@seznam.cz> wrote:

>
> On 10/6/23 15:11, Kenneth Knowles wrote:
>
>
>
> On Fri, Oct 6, 2023 at 3:20 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi,
>>
>> there is also one other thing to mention with relation to
>> Reshuffle/RequiresStableinput and that is that our current implementation
>> of RequiresStableInput can break without Reshuffle in some corner cases on
>> most portable runners, at least with Java GreedyPipelineFuser, see [1]. The
>> only way to workaround this currently is inserting Reshuffle (or any other
>> fusion-break transform) directly before the stable DoFn (Reshuffle is
>> handy, because it does not change the data). I think we should either
>> somehow fix the issue [1] or include fusion break as a mandatory
>> requirement for the new Redistribute transform as well (at least with some
>> variant) or possibly add a new "hint" for non-optional fusion breaking.
>>
> This is actually the bug we have wanted to fix for years - redistribute
> has nothing to do with checkpointing or stable input and Reshuffle
> incorrectly merges the two concepts.
>
> I agree that we couldn't make any immediate change that will break a
> runner. I believe runners that depend on Reshuffle to provide stable input
> will also provide stable input after GroupByKey. Since the SDK expansion of
> Reshuffle will still contains a GBK, those runners functionality will be
> unchanged.
>
> I don't yet have a firm opinion between the these approaches:
>
> 1. Adjust the Java SDK implementation of Reshuffle (and maybe other SDKs
> if needed). With some flag so that users can use the old wrong behavior for
> update compatibility.
> 2. Add a Redistribute transform to the SDKs that has the right behavior
> and leave Reshuffle as it is.
> 1+2. Add the Redistribute transform but also make Reshuffle call it, so
> Reshuffle also gets the new behavior, with the same flag so that users can
> use the old wrong behavior for update compatibility.
>
> All of these will leave "Reshuffle for RequestStableInput" alone for now.
> The options that include (2) will move us a little closer to migrating to a
> "better" future state.
>
> I might have not expressed the right way. I understand that Reshuffle
> having "stable input" functionality is non-portable side-effect. It would
> be nice to get rid of it and my impression from this thread was that we
> would try to deprecate Reshuffle and introduce Redistribute which will not
> have such semantics. All of this is fine, problem is that we currently (is
> some corner cases) rely on Reshuffle *even though* Pipeline uses
> @RequiresStableInput. That is due to the fact that Reshuffle also ensures
> fusion breaking.  Fusing non-deterministic DoFn with stable DoFn breaks the
> stable input property, because runners can ensure stability only at the
> input of executable stage. Therefore we would either need to:
>
>  a) define Redistribute as being an unconditional fusion break boundary, or
>
>  b) define some other transform or hint to be able to enforce fusion
> breaking
>
> Otherwise I'd be in favor of 2 and deprecation of Reshuffle.
>

Just to be very clear - my goal right now is to just give Reshuffle a
consistent semantics. Even for the old "stable input + redistribute" use of
Reshuffle, the semantics are inconsistent/undefined and the Java SDK
expansion is wrong. Changing things having to do with stable input are not
part of what I am trying to change right now. But it is fine to do things
that prepare for that.

Kenn


>  Jan
>
>
> Any votes? Any other options?
>
> Kenn
>
>  Jan
>>
>> [1] https://github.com/apache/beam/issues/24655
>> On 10/5/23 21:01, Robert Burke wrote:
>>
>> Reshuffle/redistribute being a transform has the benefit of allowing
>> existing runners that aren't updated to be aware of the new urns to rely on
>> an SDK side implementation, which may be more expensive than what the
>> runner is able to do with that awareness.
>>
>> Aka: it gives purpose to the fallback implementations.
>>
>> On Thu, Oct 5, 2023, 9:03 AM Kenneth Knowles <k...@apache.org> wrote:
>>
>>> Another perspective, ignoring runners custom implementations and
>>> non-Java SDKs could be that the semantics are perfectly well defined: it is
>>> a composite and its semantics are defined by its implementation in terms of
>>> primitives. It is just that this expansion is not what we want so we should
>>> not use it (and also we shouldn't use "whatever the implementation does" as
>>> a spec for anything we care about).
>>>
>>> On Thu, Oct 5, 2023 at 11:56 AM Kenneth Knowles <k...@apache.org> wrote:
>>>
>>>> I totally agree. I am motivated right now by the fact that it is
>>>> already used all over the place but with no consistent semantics. Maybe it
>>>> is simpler to focus on just making the minimal change, which would
>>>> basically be to update the expansion of the Reshuffle in the Java SDK.
>>>>
>>>> Kenn
>>>>
>>>> On Thu, Oct 5, 2023 at 11:39 AM John Casey <theotherj...@google.com>
>>>> wrote:
>>>>
>>>>> Given that this is a hint, I'm not sure redistribute should be a
>>>>> PTransform as opposed to some other way to hint to a runner.
>>>>>
>>>>> I'm not sure of what the syntax of that would be, but a semantic no-op
>>>>> transform that the runner may or may not do anything with is odd.
>>>>>
>>>>> On Thu, Oct 5, 2023 at 11:30 AM Kenneth Knowles <k...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> So a high level suggestion from Robert that I want to highlight as a
>>>>>> top-post:
>>>>>>
>>>>>> Instead of focusing on just fixing the SDKs and runners Reshuffle,
>>>>>> this could be an opportunity to introduce Redistribute which was proposed
>>>>>> in the long-ago thread. The semantics are identical but it is more clear
>>>>>> that it *only* is a hint about redistributing data and there is no
>>>>>> expectation of a checkpoint.
>>>>>>
>>>>>> This new name may also be an opportunity to maintain update
>>>>>> compatibility (though this may actually be leaving unsafe code in user's
>>>>>> hands) and/or separate @RequiresStableInput/checkpointing uses of 
>>>>>> Reshuffle
>>>>>> from redistribution-only uses of Reshuffle.
>>>>>>
>>>>>> Any other thoughts on this one high level bit?
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> On Thu, Oct 5, 2023 at 11:15 AM Kenneth Knowles <k...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>>
>>>>>>> On Wed, Oct 4, 2023 at 7:45 PM Robert Burke <lostl...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> LGTM.
>>>>>>>>
>>>>>>>> It looks the Go SDK already adheres to these semantics as well for
>>>>>>>> the reference impl(well, reshuffle/redistribute_randomly, _by_key isn't
>>>>>>>> implemented in the Go SDK, and only uses the existing unqualified 
>>>>>>>> reshuffle
>>>>>>>> URN [0].
>>>>>>>>
>>>>>>>> The original strategy, and then for every element, the original
>>>>>>>> Window, TS, and Pane are all serialized, shuffled, and then 
>>>>>>>> deserialized
>>>>>>>> downstream.
>>>>>>>>
>>>>>>>>
>>>>>>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go#L65
>>>>>>>>
>>>>>>>>
>>>>>>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go#L145
>>>>>>>>
>>>>>>>> Prism at the moment vaccuously implements reshuffle by omitting the
>>>>>>>> node, and rewriting the inputs and outputs [1], as it's a local runner 
>>>>>>>> with
>>>>>>>> single transform per bundle execution, but I was intending to make it a
>>>>>>>> fusion break regardless.  Ultimately prism's "test" variant will 
>>>>>>>> default to
>>>>>>>> executing the SDKs dictated reference implementation for the 
>>>>>>>> composite(s),
>>>>>>>> and any "fast" or "prod" variant would simply do the current 
>>>>>>>> implementation.
>>>>>>>>
>>>>>>>
>>>>>>> Very nice!
>>>>>>>
>>>>>>> And of course I should have linked out to the existing reshuffle URN
>>>>>>> in the proto.
>>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>> Robert Burke
>>>>>>>> Beam Go Busybody
>>>>>>>>
>>>>>>>> [0]:
>>>>>>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L46C3-L46C50
>>>>>>>> [1]:
>>>>>>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go#L82
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On 2023/09/26 15:43:53 Kenneth Knowles wrote:
>>>>>>>> > Hi everyone,
>>>>>>>> >
>>>>>>>> > Recently there was a bug [1] caused by discrepancies between two
>>>>>>>> of
>>>>>>>> > Dataflow's reshuffle implementations. I think the reference
>>>>>>>> implementation
>>>>>>>> > in the Java SDK [2] also does not match. This all led to
>>>>>>>> discussion on the
>>>>>>>> > bug and the pull request [3] about what the actual semantics
>>>>>>>> should be. I
>>>>>>>> > got it wrong, maybe multiple times. So I wrote up a very short
>>>>>>>> document to
>>>>>>>> > finish the discussion:
>>>>>>>> >
>>>>>>>> >     https://s.apache.org/beam-reshuffle
>>>>>>>> >
>>>>>>>> > This is also probably among the simplest imaginable use of
>>>>>>>> > http://s.apache.org/ptransform-design-doc in case you want to
>>>>>>>> see kind of
>>>>>>>> > how I intended it to be used.
>>>>>>>> >
>>>>>>>> > Kenn
>>>>>>>> >
>>>>>>>> > [1] https://github.com/apache/beam/issues/28219
>>>>>>>> > [2]
>>>>>>>> >
>>>>>>>> https://github.com/apache/beam/blob/d52b077ad505c8b50f10ec6a4eb83d385cdaf96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L84
>>>>>>>> > [3] https://github.com/apache/beam/pull/28272
>>>>>>>> >
>>>>>>>>
>>>>>>>

Reply via email to