Yes, I'm aware that Beam is not defined in terms of runner convenience, but in terms of data transform primitives. On the other hand - looking at that from specific perspective - even though stateless shuffle does not change the data itself, it changes distribution of data with relation to partitioning, which is a property of the data as well. Not only the data itself, but also any metadata about it might be viewed as something that characterizes a PCollection and as something that can be manipulated. Hence a transform can be given a proper data-related semantics, even though it is a nop with regards to actual _contents_ of a PCollection. Having said that, my point was that if Redistribute was a defined fundamental primitive, it would immediately follow that it should not be implemented as GBK-unGBK (at least not with extra care), because it leads to problems with the stateful GBK introducing unexpected side-effects, which from my understanding was the initial problem that started this thread.

Best,

 Jan

On 10/19/23 20:26, Kenneth Knowles wrote:
Well I accidentally conflated "stateful" and "persisting", but anyhow
yea we aren't targeting to have one Beam primitive for each thing that
is probably a runner primitive.

On Thu, Oct 19, 2023 at 2:25 PM Kenneth Knowles <k...@apache.org> wrote:
On Fri, Oct 13, 2023 at 12:51 PM Jan Lukavský <je...@seznam.cz> wrote:
Hi,

I think there's been already said nearly everything in this thread, but ... it 
is time for Friday discussions. :)

Today I recalled of a discussion we've had long time ago, when we were 
designing Euphoria (btw, deprecating and removing it is still on my todo list, 
I should create a vote thread for that). We had 4 primitives:

  a) non-shuffle, stateless ~ stateless ParDo

  b) shuffle, stateful ~ stateful ParDo, with the ability (under the right circumstances, 
 i.e. defined event-time trigger, defined state merge function, ...) to be performed in a 
"combinable way".

  c) shuffle, stateless ~ Reshuffle

  d) non-shuffle, stateful - nope, makes no sense :) - part of the "combinable 
stateful shuffle operation"

  e) union ~ Flatten

Turns out you can build everything bottom up from these.

Now, the not-so-well defined semantics of Reshuffle (Redistribute) might arise 
from the fact it is not a primitive. Stateless shuffling of data is definitely 
a primitive of all runners.
Not Dataflow :-)

But more importantly, Beam primitives are deliberately chosen to be
fundamental data operations, not physical plan steps that a runner
might use. In other words, Beam is decidedly _not_ a library for
building composites that eventually are constructed from runner
primitives. It is more like SQL in that it is a library for building
composites that eventually are constructed from fundamental operations
on data, that every engine (like every RDBMS) will be able to
implement in its own way.

Kenn

Therefore here goes the question - should Redistribute be a primitive and not 
be built up from other transforms?

Best,

  Jan

On 10/6/23 21:12, Kenneth Knowles wrote:



On Fri, Oct 6, 2023 at 3:07 PM Jan Lukavský <je...@seznam.cz> wrote:

On 10/6/23 15:11, Kenneth Knowles wrote:



On Fri, Oct 6, 2023 at 3:20 AM Jan Lukavský <je...@seznam.cz> wrote:
Hi,

there is also one other thing to mention with relation to Reshuffle/RequiresStableinput 
and that is that our current implementation of RequiresStableInput can break without 
Reshuffle in some corner cases on most portable runners, at least with Java 
GreedyPipelineFuser, see [1]. The only way to workaround this currently is inserting 
Reshuffle (or any other fusion-break transform) directly before the stable DoFn 
(Reshuffle is handy, because it does not change the data). I think we should either 
somehow fix the issue [1] or include fusion break as a mandatory requirement for the new 
Redistribute transform as well (at least with some variant) or possibly add a new 
"hint" for non-optional fusion breaking.
This is actually the bug we have wanted to fix for years - redistribute has 
nothing to do with checkpointing or stable input and Reshuffle incorrectly 
merges the two concepts.

I agree that we couldn't make any immediate change that will break a runner. I 
believe runners that depend on Reshuffle to provide stable input will also 
provide stable input after GroupByKey. Since the SDK expansion of Reshuffle 
will still contains a GBK, those runners functionality will be unchanged.

I don't yet have a firm opinion between the these approaches:

1. Adjust the Java SDK implementation of Reshuffle (and maybe other SDKs if 
needed). With some flag so that users can use the old wrong behavior for update 
compatibility.
2. Add a Redistribute transform to the SDKs that has the right behavior and 
leave Reshuffle as it is.
1+2. Add the Redistribute transform but also make Reshuffle call it, so 
Reshuffle also gets the new behavior, with the same flag so that users can use 
the old wrong behavior for update compatibility.

All of these will leave "Reshuffle for RequestStableInput" alone for now. The options 
that include (2) will move us a little closer to migrating to a "better" future state.

I might have not expressed the right way. I understand that Reshuffle having "stable 
input" functionality is non-portable side-effect. It would be nice to get rid of it 
and my impression from this thread was that we would try to deprecate Reshuffle and 
introduce Redistribute which will not have such semantics. All of this is fine, problem 
is that we currently (is some corner cases) rely on Reshuffle *even though* Pipeline uses 
@RequiresStableInput. That is due to the fact that Reshuffle also ensures fusion 
breaking.  Fusing non-deterministic DoFn with stable DoFn breaks the stable input 
property, because runners can ensure stability only at the input of executable stage. 
Therefore we would either need to:

  a) define Redistribute as being an unconditional fusion break boundary, or

  b) define some other transform or hint to be able to enforce fusion breaking

Otherwise I'd be in favor of 2 and deprecation of Reshuffle.

Just to be very clear - my goal right now is to just give Reshuffle a consistent 
semantics. Even for the old "stable input + redistribute" use of Reshuffle, the 
semantics are inconsistent/undefined and the Java SDK expansion is wrong. Changing things 
having to do with stable input are not part of what I am trying to change right now. But 
it is fine to do things that prepare for that.

Kenn

  Jan


Any votes? Any other options?

Kenn

  Jan

[1] https://github.com/apache/beam/issues/24655

On 10/5/23 21:01, Robert Burke wrote:

Reshuffle/redistribute being a transform has the benefit of allowing existing 
runners that aren't updated to be aware of the new urns to rely on an SDK side 
implementation, which may be more expensive than what the runner is able to do 
with that awareness.

Aka: it gives purpose to the fallback implementations.

On Thu, Oct 5, 2023, 9:03 AM Kenneth Knowles <k...@apache.org> wrote:
Another perspective, ignoring runners custom implementations and non-Java SDKs could be 
that the semantics are perfectly well defined: it is a composite and its semantics are 
defined by its implementation in terms of primitives. It is just that this expansion is 
not what we want so we should not use it (and also we shouldn't use "whatever the 
implementation does" as a spec for anything we care about).

On Thu, Oct 5, 2023 at 11:56 AM Kenneth Knowles <k...@apache.org> wrote:
I totally agree. I am motivated right now by the fact that it is already used 
all over the place but with no consistent semantics. Maybe it is simpler to 
focus on just making the minimal change, which would basically be to update the 
expansion of the Reshuffle in the Java SDK.

Kenn

On Thu, Oct 5, 2023 at 11:39 AM John Casey <theotherj...@google.com> wrote:
Given that this is a hint, I'm not sure redistribute should be a PTransform as 
opposed to some other way to hint to a runner.

I'm not sure of what the syntax of that would be, but a semantic no-op 
transform that the runner may or may not do anything with is odd.

On Thu, Oct 5, 2023 at 11:30 AM Kenneth Knowles <k...@apache.org> wrote:
So a high level suggestion from Robert that I want to highlight as a top-post:

Instead of focusing on just fixing the SDKs and runners Reshuffle, this could 
be an opportunity to introduce Redistribute which was proposed in the long-ago 
thread. The semantics are identical but it is more clear that it only is a hint 
about redistributing data and there is no expectation of a checkpoint.

This new name may also be an opportunity to maintain update compatibility 
(though this may actually be leaving unsafe code in user's hands) and/or 
separate @RequiresStableInput/checkpointing uses of Reshuffle from 
redistribution-only uses of Reshuffle.

Any other thoughts on this one high level bit?

Kenn

On Thu, Oct 5, 2023 at 11:15 AM Kenneth Knowles <k...@apache.org> wrote:

On Wed, Oct 4, 2023 at 7:45 PM Robert Burke <lostl...@apache.org> wrote:
LGTM.

It looks the Go SDK already adheres to these semantics as well for the 
reference impl(well, reshuffle/redistribute_randomly, _by_key isn't implemented 
in the Go SDK, and only uses the existing unqualified reshuffle URN [0].

The original strategy, and then for every element, the original Window, TS, and 
Pane are all serialized, shuffled, and then deserialized downstream.

https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go#L65

https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go#L145

Prism at the moment vaccuously implements reshuffle by omitting the node, and rewriting the inputs and 
outputs [1], as it's a local runner with single transform per bundle execution, but I was intending to make 
it a fusion break regardless.  Ultimately prism's "test" variant will default to executing the SDKs 
dictated reference implementation for the composite(s), and any "fast" or "prod" variant 
would simply do the current implementation.

Very nice!

And of course I should have linked out to the existing reshuffle URN in the 
proto.

Kenn



Robert Burke
Beam Go Busybody

[0]: 
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L46C3-L46C50
[1]: 
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go#L82



On 2023/09/26 15:43:53 Kenneth Knowles wrote:
Hi everyone,

Recently there was a bug [1] caused by discrepancies between two of
Dataflow's reshuffle implementations. I think the reference implementation
in the Java SDK [2] also does not match. This all led to discussion on the
bug and the pull request [3] about what the actual semantics should be. I
got it wrong, maybe multiple times. So I wrote up a very short document to
finish the discussion:

     https://s.apache.org/beam-reshuffle

This is also probably among the simplest imaginable use of
http://s.apache.org/ptransform-design-doc in case you want to see kind of
how I intended it to be used.

Kenn

[1] https://github.com/apache/beam/issues/28219
[2]
https://github.com/apache/beam/blob/d52b077ad505c8b50f10ec6a4eb83d385cdaf96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L84
[3] https://github.com/apache/beam/pull/28272

Reply via email to