Deduplicate usage

2023-03-02 Thread Binh Nguyen Van
Hi, I am writing a pipeline and want to apply deduplication. I look at Deduplicate transform that Beam provides and wonder about its usage. Do I need to shuffle input collection by key before calling this transformation? I look at its source code and it doesn’t do any shuffle so wonder how it work

Re: Deduplicate usage

2023-03-02 Thread Ankur Goenka
Hi Binh, The Deduplicate transform uses state api to do the de-duplication which should do the needful operations to work across multiple concurrent workers. Thanks, Ankur On Thu, 2 Mar 2023 at 10:18, Binh Nguyen Van wrote: > Hi, > > I am writing a pipeline and want to apply deduplication. I lo

Re: Deduplicate usage

2023-03-02 Thread Binh Nguyen Van
Thank you Ankur, This is the current source code of Deduplicate transform. Boolean seen = seenState.read(); // Seen state is either set or not set so if it has been set then it must be true. if (seen == null) { // We don't want the expiry timer to hold up watermarks.

Re: Beam SQL Alias issue while using With Clause

2023-03-02 Thread Andrew Pilloud via user
Hi Talat, I managed to turn your test case into something against Calcite. It looks like there is a bug affecting tables that contain one or more single element structs and no multi element structs. I've sent the details to the Calcite mailing list here. https://lists.apache.org/thread/tlr9hsmx09b

Re: Deduplicate usage

2023-03-02 Thread Reuven Lax via user
State is per-key, and keys are distributed across workers. Two workers should not be working on the same state. On Thu, Mar 2, 2023 at 10:48 AM Binh Nguyen Van wrote: > Thank you Ankur, > > This is the current source code of Deduplicate transform. > > Boolean seen = seenState.read(); >

Successful Inserts for Storage Write API?

2023-03-02 Thread Matthew Ouyang
Thank you to Ahmed and Reuven for the tip on WriteResult:: getFailedStorageApiInserts. When I tried to get the successful inserts through the Storage Write API, I received an error message saying that "Retrieving successful inserts is only supported for streaming inserts. Make sure withSuccessfulI

Re: Successful Inserts for Storage Write API?

2023-03-02 Thread Reuven Lax via user
Are you trying to do this in order to use Wait.on? getSuccessfulInserts is not currently supported for Storage Write API. On Thu, Mar 2, 2023 at 1:44 PM Matthew Ouyang wrote: > Thank you to Ahmed and Reuven for the tip on > WriteResult::getFailedStorageApiInserts. > > When I tried to get the suc

Re: Beam SQL Alias issue while using With Clause

2023-03-02 Thread Talat Uyarer via user
Hi Andrew, Thank you so much for your help. Sorry to hear you changed team :( I can handle calcite upgrades if there is a fix. I was working on calcite upgrade but then we started having so many issues. That's why I stopped doing it. Talat On Thu, Mar 2, 2023 at 11:56 AM Andrew Pilloud wrote:

Re: Deduplicate usage

2023-03-02 Thread Binh Nguyen Van
Thanks Reuven, I got the idea of the state is per key and keys are distributed across workers but I am trying to understand where/how the distribution part is implemented so that elements with the same keys will go to the same worker. Do I need to do this before calling `Deduplicate` transform? If

Re: Deduplicate usage

2023-03-02 Thread Robert Bradshaw via user
Whenever state is used, the runner will arrange such that the same keys will all go to the same worker, which often involves injecting a shuffle-like operation if the keys are spread out among many workers in the input. (An alternative implementation could involve storing the state in a distributed

Re: Deduplicate usage

2023-03-02 Thread Binh Nguyen Van
That explains it. Thank you Robert and all! -Binh On Thu, Mar 2, 2023 at 4:51 PM Robert Bradshaw via user < user@beam.apache.org> wrote: > Whenever state is used, the runner will arrange such that the same > keys will all go to the same worker, which often involves injecting a > shuffle-like op