Thank you Ankur,

This is the current source code of Deduplicate transform.

      Boolean seen = seenState.read();
      // Seen state is either set or not set so if it has been set
then it must be true.
      if (seen == null) {
        // We don't want the expiry timer to hold up watermarks.
        expiryTimer.offset(duration).withNoOutputTimestamp().setRelative();
        seenState.write(true);
        receiver.output(element);
      }

Could you please explain the synchronization for the following scenario?

   - There are two workers.
   - Both workers read the same state at the same time and the state was
   not set yet. In this case, both will get null in the response (I believe)
   - Both of them will try to set the state and send the output out.

What will happen in this scenario?

Thank you
-Binh

On Thu, Mar 2, 2023 at 10:29 AM Ankur Goenka <ankurgoe...@gmail.com> wrote:

> Hi Binh, The Deduplicate transform uses state api to do the
> de-duplication which should do the needful operations to work across
> multiple concurrent workers.
>
> Thanks,
> Ankur
>
> On Thu, 2 Mar 2023 at 10:18, Binh Nguyen Van <binhn...@gmail.com> wrote:
>
>> Hi,
>>
>> I am writing a pipeline and want to apply deduplication. I look at
>> Deduplicate transform that Beam provides and wonder about its usage. Do
>> I need to shuffle input collection by key before calling this
>> transformation? I look at its source code and it doesn’t do any shuffle so
>> wonder how it works when let’s say there are duplicates and the duplicated
>> elements are processed concurrently on multiple workers.
>>
>> Thank you
>> -Binh
>>
>

Reply via email to