The GoSDK handles the urn as unkeyed. That is, reshuffling a PCollection<KV> will ignore the keys, and produce a PCollection<KV<int,KV>> with the random keys. This would split user keys up to multiple partitions. This is the same as though it were unkeyed.
Doing anything with the user key specifically would seem to me to defeat the point of a reshuffle, vs just using a GBK which would align keys to bundles in it's output. On Wed, Oct 6, 2021, 10:54 AM Ke Wu <[email protected]> wrote: > The only usage of of the keyed Reshuffle in the Java SDK is for write >> files with a single key and the use case there would benefit from being >> replaced with GroupIntoBatches instead. >> > > I think there are more use cases for keyed reshuffle , e.g. in Samza > runner, it is also used when we rekeyed elements, in addition, since states > are partitioned by key, so it is important to reshuffle after a PCollection > is assigned with a different key so elements with the same new key could > end up in the same partition. > > I believe the intent was for the existing reshuffle URN to represent the >> keyed variant. This was since the unkeyed reshuffle was a composite built >> on top of the keyed reshuffle in the Java SDK. The existing overrides in >> Flink/Spark/Samza confirm this. >> > > I believe so, because all Samza/Flink/Spark ’s Reshuffle translator are > authored in Java, which is expecting keyed Reshuffle<K, V>. > > I believe the intent was for the existing reshuffle URN to represent the >> keyed variant. >> > And from the Python side I thought the intent was for the reshuffle > URN to represent the unkeyed variant, as the keyed one isn't anything > novel > > > This is exactly what is confusing, the same urn is currently representing > keyed reshuffle in Java SDK but unkeyed reshuffle in Python SDK. > @Luke do you think it makes since to have two separately Urns representing > two different reshuffles? Unkeyed reshuffle is still expected to be a > composite transform of keyed transform and runners can decided which > (keyed/unkeyd) reshuffle they want to translate. > > Best, > Ke > > On Oct 6, 2021, at 10:38 AM, Reuven Lax <[email protected]> wrote: > > I think it's used with the destination as a key, no? In various places > Reshuffle is used as a standin for RequiresStableInput > > On Wed, Oct 6, 2021 at 10:07 AM Luke Cwik <[email protected]> wrote: > >> I believe the intent was for the existing reshuffle URN to represent the >> keyed variant. This was since the unkeyed reshuffle was a composite built >> on top of the keyed reshuffle in the Java SDK. The existing overrides in >> Flink/Spark/Samza confirm this. >> >> Thinking about this more I wish we had went only with the unkeyed variant >> as I don't know how much benefit users get from having their records >> grouped by the key they choose and it also limits the optimization >> capabilities of the runner a lot as to how to materialize the data. >> >> The only usage of of the keyed Reshuffle in the Java SDK is for write >> files with a single key and the use case there would benefit from being >> replaced with GroupIntoBatches instead. >> >> >> On Mon, Oct 4, 2021 at 6:31 PM Robert Burke <[email protected]> wrote: >> >>> I can handle the Go SDK change once the urn is decided. I'm cleaning up >>> a change to add the combine_global urn in the Go SDK so this can slip in >>> along side it. >>> >>> On Mon, Oct 4, 2021, 3:57 PM Ke Wu <[email protected]> wrote: >>> >>>> Created https://issues.apache.org/jira/browse/BEAM-12999 >>>> >>>> On Oct 4, 2021, at 3:37 PM, Robert Bradshaw <[email protected]> >>>> wrote: >>>> >>>> Thanks. Happy to help with Python/Go. Do you want to create a JIRA? >>>> >>>> On Mon, Oct 4, 2021 at 3:33 PM Ke Wu <[email protected]> wrote: >>>> >>>> >>>> Let me add two new urns representing reshuffle via random key and >>>> reshuffle using key. I will share the PR later here, would need some help >>>> on Python/Go SDK changes too since I am not very familiar with them. >>>> >>>> Best, >>>> Ke >>>> >>>> >>>> On Oct 4, 2021, at 3:11 PM, Robert Bradshaw <[email protected]> >>>> wrote: >>>> >>>> On Mon, Oct 4, 2021 at 3:08 PM Jan Lukavský <[email protected]> wrote: >>>> >>>> >>>> On 10/4/21 11:43 PM, Robert Bradshaw wrote: >>>> >>>> Oh, yes. >>>> >>>> Java Reshuffle.of() = Python ReshufflePerKey() >>>> Java Reshuffle.viaRandomKey() == Python Reshuffle() >>>> >>>> We generally try to avoid this kind of discrepancy. It could make >>>> sense to rename Reshuffle.of() to Reshuffle.viaKey(). >>>> >>>> >>>> I'd suggest Reshuffle.usingKey(), but I'm not native speaker, so that >>>> might be opinionated. >>>> >>>> >>>> usingKey does sound better. (And, FWIW, usingRandomKey() sounds better >>>> to me than vaiRandomKey(), but probably not worth changing so the >>>> question becomes whether to be stilted or consistent.) >>>> >>>> More importantly - could we undeprecate Reshuffle >>>> (in Java SDK)? I believe it was deprecated for wrong reasons - yes, it >>>> has undocumented and non-portable side-effects, but is still makes sense >>>> for various use-cases (e.g. fan-out, or SDF). >>>> >>>> >>>> +1 >>>> >>>> >>>> On Mon, Oct 4, 2021 at 2:33 PM Ke Wu <[email protected]> wrote: >>>> >>>> I should have said that the descrepency lives in SDK not Class vs >>>> Portable. >>>> >>>> Correct me if I am wrong, Reshuffle transform in Java SDK requires the >>>> input to be KV [1] while Reshuffle in Python [2] and Go [3] does not. >>>> >>>> >>>> [1] >>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L53 >>>> [2] >>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730 >>>> [3] >>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/gbk.go#L122 >>>> >>>> On Oct 4, 2021, at 12:09 PM, Robert Bradshaw <[email protected]> >>>> wrote: >>>> >>>> Reshuffle is not keyed, there is a separate reshuffle-per-key for >>>> that. This is true for both Java and Python. This shouldn't depend on >>>> classic vs. portable mode. It sounds like there's an issue in >>>> translation. >>>> >>>> On Mon, Oct 4, 2021 at 11:18 AM Ke Wu <[email protected]> wrote: >>>> >>>> >>>> Hello All, >>>> >>>> Recent Samza Runner tests failure in python/xlang [1][2] reveals an >>>> interesting fact that Reshuffle Transform in classic pipeline requires the >>>> input to be KV while portable pipeline does not, where Reshuffle in >>>> portable mode it has an extra step to append a random key [3]. >>>> >>>> This suggests that Reshuffle in classic mode is, sort of, equivalent to >>>> ReshufflePerKey in potable mode instead of Reshuffle itself. Couple of >>>> questions on this: >>>> >>>> 1. Is such SDK/API discrepancy expected? >>>> 2. If Yes, then, what are the advised approach for runners to implement >>>> translators for such transforms? >>>> 3. If No, is this something we can improve? >>>> >>>> Best, >>>> Ke >>>> >>>> >>>> [1] https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/288/ >>>> [2] https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/285/ >>>> [3] >>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730 >>>> >>>> >>>> >>>> >
