Thanks Reuven, I got the idea of the state is per key and keys are distributed across workers but I am trying to understand where/how the distribution part is implemented so that elements with the same keys will go to the same worker. Do I need to do this before calling `Deduplicate` transform? If not then where is it being implemented?
Thanks -Binh On Thu, Mar 2, 2023 at 12:57 PM Reuven Lax via user <user@beam.apache.org> wrote: > State is per-key, and keys are distributed across workers. Two workers > should not be working on the same state. > > On Thu, Mar 2, 2023 at 10:48 AM Binh Nguyen Van <binhn...@gmail.com> > wrote: > >> Thank you Ankur, >> >> This is the current source code of Deduplicate transform. >> >> Boolean seen = seenState.read(); >> // Seen state is either set or not set so if it has been set then it >> must be true. >> if (seen == null) { >> // We don't want the expiry timer to hold up watermarks. >> expiryTimer.offset(duration).withNoOutputTimestamp().setRelative(); >> seenState.write(true); >> receiver.output(element); >> } >> >> Could you please explain the synchronization for the following scenario? >> >> - There are two workers. >> - Both workers read the same state at the same time and the state was >> not set yet. In this case, both will get null in the response (I >> believe) >> - Both of them will try to set the state and send the output out. >> >> What will happen in this scenario? >> >> Thank you >> -Binh >> >> On Thu, Mar 2, 2023 at 10:29 AM Ankur Goenka <ankurgoe...@gmail.com> >> wrote: >> >>> Hi Binh, The Deduplicate transform uses state api to do the >>> de-duplication which should do the needful operations to work across >>> multiple concurrent workers. >>> >>> Thanks, >>> Ankur >>> >>> On Thu, 2 Mar 2023 at 10:18, Binh Nguyen Van <binhn...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> I am writing a pipeline and want to apply deduplication. I look at >>>> Deduplicate transform that Beam provides and wonder about its usage. >>>> Do I need to shuffle input collection by key before calling this >>>> transformation? I look at its source code and it doesn’t do any shuffle so >>>> wonder how it works when let’s say there are duplicates and the duplicated >>>> elements are processed concurrently on multiple workers. >>>> >>>> Thank you >>>> -Binh >>>> >>>