Hi,
I am writing a pipeline and want to apply deduplication. I look at
Deduplicate transform that Beam provides and wonder about its usage. Do I
need to shuffle input collection by key before calling this transformation?
I look at its source code and it doesn’t do any shuffle so wonder how it
work
Hi Binh, The Deduplicate transform uses state api to do the de-duplication
which should do the needful operations to work across multiple concurrent
workers.
Thanks,
Ankur
On Thu, 2 Mar 2023 at 10:18, Binh Nguyen Van wrote:
> Hi,
>
> I am writing a pipeline and want to apply deduplication. I lo
Thank you Ankur,
This is the current source code of Deduplicate transform.
Boolean seen = seenState.read();
// Seen state is either set or not set so if it has been set
then it must be true.
if (seen == null) {
// We don't want the expiry timer to hold up watermarks.
Hi Talat,
I managed to turn your test case into something against Calcite. It
looks like there is a bug affecting tables that contain one or more
single element structs and no multi element structs. I've sent the
details to the Calcite mailing list here.
https://lists.apache.org/thread/tlr9hsmx09b
State is per-key, and keys are distributed across workers. Two workers
should not be working on the same state.
On Thu, Mar 2, 2023 at 10:48 AM Binh Nguyen Van wrote:
> Thank you Ankur,
>
> This is the current source code of Deduplicate transform.
>
> Boolean seen = seenState.read();
>
Thank you to Ahmed and Reuven for the tip on WriteResult::
getFailedStorageApiInserts.
When I tried to get the successful inserts through the Storage Write API, I
received an error message saying that "Retrieving successful inserts is
only supported for streaming inserts. Make sure
withSuccessfulI
Are you trying to do this in order to use Wait.on? getSuccessfulInserts is
not currently supported for Storage Write API.
On Thu, Mar 2, 2023 at 1:44 PM Matthew Ouyang
wrote:
> Thank you to Ahmed and Reuven for the tip on
> WriteResult::getFailedStorageApiInserts.
>
> When I tried to get the suc
Hi Andrew,
Thank you so much for your help. Sorry to hear you changed team :( I can
handle calcite upgrades if there is a fix. I was working on calcite upgrade
but then we started having so many issues. That's why I stopped doing it.
Talat
On Thu, Mar 2, 2023 at 11:56 AM Andrew Pilloud wrote:
Thanks Reuven,
I got the idea of the state is per key and keys are distributed across
workers but I am trying to understand where/how the distribution part is
implemented so that elements with the same keys will go to the same worker.
Do I need to do this before calling `Deduplicate` transform? If
Whenever state is used, the runner will arrange such that the same
keys will all go to the same worker, which often involves injecting a
shuffle-like operation if the keys are spread out among many workers
in the input. (An alternative implementation could involve storing the
state in a distributed
That explains it. Thank you Robert and all!
-Binh
On Thu, Mar 2, 2023 at 4:51 PM Robert Bradshaw via user <
user@beam.apache.org> wrote:
> Whenever state is used, the runner will arrange such that the same
> keys will all go to the same worker, which often involves injecting a
> shuffle-like op
11 matches
Mail list logo