Thank you Ankur, This is the current source code of Deduplicate transform.
Boolean seen = seenState.read(); // Seen state is either set or not set so if it has been set then it must be true. if (seen == null) { // We don't want the expiry timer to hold up watermarks. expiryTimer.offset(duration).withNoOutputTimestamp().setRelative(); seenState.write(true); receiver.output(element); } Could you please explain the synchronization for the following scenario? - There are two workers. - Both workers read the same state at the same time and the state was not set yet. In this case, both will get null in the response (I believe) - Both of them will try to set the state and send the output out. What will happen in this scenario? Thank you -Binh On Thu, Mar 2, 2023 at 10:29 AM Ankur Goenka <ankurgoe...@gmail.com> wrote: > Hi Binh, The Deduplicate transform uses state api to do the > de-duplication which should do the needful operations to work across > multiple concurrent workers. > > Thanks, > Ankur > > On Thu, 2 Mar 2023 at 10:18, Binh Nguyen Van <binhn...@gmail.com> wrote: > >> Hi, >> >> I am writing a pipeline and want to apply deduplication. I look at >> Deduplicate transform that Beam provides and wonder about its usage. Do >> I need to shuffle input collection by key before calling this >> transformation? I look at its source code and it doesn’t do any shuffle so >> wonder how it works when let’s say there are duplicates and the duplicated >> elements are processed concurrently on multiple workers. >> >> Thank you >> -Binh >> >