That explains it. Thank you Robert and all!

-Binh


On Thu, Mar 2, 2023 at 4:51 PM Robert Bradshaw via user <
user@beam.apache.org> wrote:

> Whenever state is used, the runner will arrange such that the same
> keys will all go to the same worker, which often involves injecting a
> shuffle-like operation if the keys are spread out among many workers
> in the input. (An alternative implementation could involve storing the
> state in a distributed transactional store with the appropriate
> locks.) There is no need for you to do anything before calling the
> Deduplicate transform.
>
> On Thu, Mar 2, 2023 at 4:34 PM Binh Nguyen Van <binhn...@gmail.com> wrote:
> >
> > Thanks Reuven,
> >
> > I got the idea of the state is per key and keys are distributed across
> workers but I am trying to understand where/how the distribution part is
> implemented so that elements with the same keys will go to the same worker.
> Do I need to do this before calling `Deduplicate` transform? If not then
> where is it being implemented?
> >
> > Thanks
> > -Binh
> >
> >
> > On Thu, Mar 2, 2023 at 12:57 PM Reuven Lax via user <
> user@beam.apache.org> wrote:
> >>
> >> State is per-key, and keys are distributed across workers. Two workers
> should not be working on the same state.
> >>
> >> On Thu, Mar 2, 2023 at 10:48 AM Binh Nguyen Van <binhn...@gmail.com>
> wrote:
> >>>
> >>> Thank you Ankur,
> >>>
> >>> This is the current source code of Deduplicate transform.
> >>>
> >>>       Boolean seen = seenState.read();
> >>>       // Seen state is either set or not set so if it has been set
> then it must be true.
> >>>       if (seen == null) {
> >>>         // We don't want the expiry timer to hold up watermarks.
> >>>
>  expiryTimer.offset(duration).withNoOutputTimestamp().setRelative();
> >>>         seenState.write(true);
> >>>         receiver.output(element);
> >>>       }
> >>>
> >>> Could you please explain the synchronization for the following
> scenario?
> >>>
> >>> There are two workers.
> >>> Both workers read the same state at the same time and the state was
> not set yet. In this case, both will get null in the response (I believe)
> >>> Both of them will try to set the state and send the output out.
> >>>
> >>> What will happen in this scenario?
> >>>
> >>> Thank you
> >>> -Binh
> >>>
> >>>
> >>> On Thu, Mar 2, 2023 at 10:29 AM Ankur Goenka <ankurgoe...@gmail.com>
> wrote:
> >>>>
> >>>> Hi Binh, The Deduplicate transform uses state api to do the
> de-duplication which should do the needful operations to work across
> multiple concurrent workers.
> >>>>
> >>>> Thanks,
> >>>> Ankur
> >>>>
> >>>> On Thu, 2 Mar 2023 at 10:18, Binh Nguyen Van <binhn...@gmail.com>
> wrote:
> >>>>>
> >>>>> Hi,
> >>>>>
> >>>>> I am writing a pipeline and want to apply deduplication. I look at
> Deduplicate transform that Beam provides and wonder about its usage. Do I
> need to shuffle input collection by key before calling this transformation?
> I look at its source code and it doesn’t do any shuffle so wonder how it
> works when let’s say there are duplicates and the duplicated elements are
> processed concurrently on multiple workers.
> >>>>>
> >>>>> Thank you
> >>>>> -Binh
>

Reply via email to