That explains it. Thank you Robert and all! -Binh
On Thu, Mar 2, 2023 at 4:51 PM Robert Bradshaw via user < user@beam.apache.org> wrote: > Whenever state is used, the runner will arrange such that the same > keys will all go to the same worker, which often involves injecting a > shuffle-like operation if the keys are spread out among many workers > in the input. (An alternative implementation could involve storing the > state in a distributed transactional store with the appropriate > locks.) There is no need for you to do anything before calling the > Deduplicate transform. > > On Thu, Mar 2, 2023 at 4:34 PM Binh Nguyen Van <binhn...@gmail.com> wrote: > > > > Thanks Reuven, > > > > I got the idea of the state is per key and keys are distributed across > workers but I am trying to understand where/how the distribution part is > implemented so that elements with the same keys will go to the same worker. > Do I need to do this before calling `Deduplicate` transform? If not then > where is it being implemented? > > > > Thanks > > -Binh > > > > > > On Thu, Mar 2, 2023 at 12:57 PM Reuven Lax via user < > user@beam.apache.org> wrote: > >> > >> State is per-key, and keys are distributed across workers. Two workers > should not be working on the same state. > >> > >> On Thu, Mar 2, 2023 at 10:48 AM Binh Nguyen Van <binhn...@gmail.com> > wrote: > >>> > >>> Thank you Ankur, > >>> > >>> This is the current source code of Deduplicate transform. > >>> > >>> Boolean seen = seenState.read(); > >>> // Seen state is either set or not set so if it has been set > then it must be true. > >>> if (seen == null) { > >>> // We don't want the expiry timer to hold up watermarks. > >>> > expiryTimer.offset(duration).withNoOutputTimestamp().setRelative(); > >>> seenState.write(true); > >>> receiver.output(element); > >>> } > >>> > >>> Could you please explain the synchronization for the following > scenario? > >>> > >>> There are two workers. > >>> Both workers read the same state at the same time and the state was > not set yet. In this case, both will get null in the response (I believe) > >>> Both of them will try to set the state and send the output out. > >>> > >>> What will happen in this scenario? > >>> > >>> Thank you > >>> -Binh > >>> > >>> > >>> On Thu, Mar 2, 2023 at 10:29 AM Ankur Goenka <ankurgoe...@gmail.com> > wrote: > >>>> > >>>> Hi Binh, The Deduplicate transform uses state api to do the > de-duplication which should do the needful operations to work across > multiple concurrent workers. > >>>> > >>>> Thanks, > >>>> Ankur > >>>> > >>>> On Thu, 2 Mar 2023 at 10:18, Binh Nguyen Van <binhn...@gmail.com> > wrote: > >>>>> > >>>>> Hi, > >>>>> > >>>>> I am writing a pipeline and want to apply deduplication. I look at > Deduplicate transform that Beam provides and wonder about its usage. Do I > need to shuffle input collection by key before calling this transformation? > I look at its source code and it doesn’t do any shuffle so wonder how it > works when let’s say there are duplicates and the duplicated elements are > processed concurrently on multiple workers. > >>>>> > >>>>> Thank you > >>>>> -Binh >