Thanks Reuven,

I got the idea of the state is per key and keys are distributed across
workers but I am trying to understand where/how the distribution part is
implemented so that elements with the same keys will go to the same worker.
Do I need to do this before calling `Deduplicate` transform? If not then
where is it being implemented?

Thanks
-Binh


On Thu, Mar 2, 2023 at 12:57 PM Reuven Lax via user <user@beam.apache.org>
wrote:

> State is per-key, and keys are distributed across workers. Two workers
> should not be working on the same state.
>
> On Thu, Mar 2, 2023 at 10:48 AM Binh Nguyen Van <binhn...@gmail.com>
> wrote:
>
>> Thank you Ankur,
>>
>> This is the current source code of Deduplicate transform.
>>
>>       Boolean seen = seenState.read();
>>       // Seen state is either set or not set so if it has been set then it 
>> must be true.
>>       if (seen == null) {
>>         // We don't want the expiry timer to hold up watermarks.
>>         expiryTimer.offset(duration).withNoOutputTimestamp().setRelative();
>>         seenState.write(true);
>>         receiver.output(element);
>>       }
>>
>> Could you please explain the synchronization for the following scenario?
>>
>>    - There are two workers.
>>    - Both workers read the same state at the same time and the state was
>>    not set yet. In this case, both will get null in the response (I
>>    believe)
>>    - Both of them will try to set the state and send the output out.
>>
>> What will happen in this scenario?
>>
>> Thank you
>> -Binh
>>
>> On Thu, Mar 2, 2023 at 10:29 AM Ankur Goenka <ankurgoe...@gmail.com>
>> wrote:
>>
>>> Hi Binh, The Deduplicate transform uses state api to do the
>>> de-duplication which should do the needful operations to work across
>>> multiple concurrent workers.
>>>
>>> Thanks,
>>> Ankur
>>>
>>> On Thu, 2 Mar 2023 at 10:18, Binh Nguyen Van <binhn...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am writing a pipeline and want to apply deduplication. I look at
>>>> Deduplicate transform that Beam provides and wonder about its usage.
>>>> Do I need to shuffle input collection by key before calling this
>>>> transformation? I look at its source code and it doesn’t do any shuffle so
>>>> wonder how it works when let’s say there are duplicates and the duplicated
>>>> elements are processed concurrently on multiple workers.
>>>>
>>>> Thank you
>>>> -Binh
>>>>
>>>

Reply via email to