[Java - Beam Schema] Manually Generating a Beam Schema for a POJO class

2020-06-26 Thread Kaymak, Tobias
I have the following class definition: public class EnrichedArticle implements Serializable { // ArticleEnvelope is generated via Protobuf private ArticleProto.ArticleEnvelope article; // Asset is a Java POJO private List assets; public EnrichedArticle(ArticleProto.ArticleEnvelope arti

Caching data inside DoFn

2020-06-26 Thread Praveen K Viswanathan
Hi All - I have a DoFn which generates data (KV pair) for each element that it is processing. It also has to read from that KV for other elements based on a key which means, the KV has to retain all the data that's getting added to it while processing every element. I was thinking about the "slow-c

Re: Caching data inside DoFn

2020-06-26 Thread Luke Cwik
Use a stateful DoFn and buffer the elements in a bag state. You'll want to use a key that contains enough data to match your join condition you are trying to match. For example, if your trying to match on a customerId then you would do something like: element 1 -> ParDo(extract customer id) -> KV -

Re: Getting PC: @ 0x7f7490e08602 (unknown) raise and Check failure stack trace: ***

2020-06-26 Thread Luke Cwik
It seems as though we have seen this failure before for Dataflow and it was caused because the side input tags needed to be unique in a streaming pipeline. It looked like this used to be a common occurrence in the Python SDK[1, 2] because it generated tags that weren't unique enough. I would open

Re: [Java - Beam Schema] Manually Generating a Beam Schema for a POJO class

2020-06-26 Thread Brian Hulette
Hi Tobias, You should be able to annotate the EnrichedArticle class with an4 @DefaultSchema annotation and Beam will infer a schema for it. You would need to make some tweaks to the class though to be compatible with the built-in schema providers: you could make the members public and use JavaFiel

Processing data of different sizes

2020-06-26 Thread André Missaglia
Hi everyone I'm building a pipeline where I group the elements and then execute a CPU-intensive function on each group. This function performs a statistical analysis over the elements, only to return a single value on the end. But because each group has a different amount of elements, some groups

Re: Getting PC: @ 0x7f7490e08602 (unknown) raise and Check failure stack trace: ***

2020-06-26 Thread Mohil Khare
Thanks a lot Luke for following up on this and opening a dataflow support. It would be good to know how streamingEngine solved the problem. I will really appreciate it if you can share a link for a support case once you open it (if it is possible). Thanks and Regards Mohil On Fri, Jun 26, 2020

Re: Getting PC: @ 0x7f7490e08602 (unknown) raise and Check failure stack trace: ***

2020-06-26 Thread Luke Cwik
Sorry, I didn't open a support case. You should open the support case. On Fri, Jun 26, 2020 at 10:41 AM Mohil Khare wrote: > Thanks a lot Luke for following up on this and opening a dataflow > support. It would be good to know how streamingEngine solved the problem. > I will really appreciate i

Re: Getting PC: @ 0x7f7490e08602 (unknown) raise and Check failure stack trace: ***

2020-06-26 Thread Mohil Khare
Sure not a problem. I will open one. Thanks Mohil On Fri, Jun 26, 2020 at 10:55 AM Luke Cwik wrote: > Sorry, I didn't open a support case. You should open the support case. > > On Fri, Jun 26, 2020 at 10:41 AM Mohil Khare wrote: > >> Thanks a lot Luke for following up on this and opening a dat

Re: Processing data of different sizes

2020-06-26 Thread Luke Cwik
There is currently no way to tell the runner how long something is expected to take. Splitting the groups into even sized groups with an aggregation that happens further in the pipeline of those partial groups will work best. Alternatively, you could run a different pipeline for "different" sized

Re: Processing data of different sizes

2020-06-26 Thread Reuven Lax
How are you grouping the elements today? On Fri, Jun 26, 2020 at 10:27 AM André Missaglia < andre.missag...@arquivei.com.br> wrote: > Hi everyone > > I'm building a pipeline where I group the elements and then execute a > CPU-intensive function on each group. This function performs a statistical

Re: Getting PC: @ 0x7f7490e08602 (unknown) raise and Check failure stack trace: ***

2020-06-26 Thread Mohil Khare
I have opened following ticket: https://issues.apache.org/jira/browse/BEAM-10339 Please let me know if there some other place where I need to open a support ticket. Thank you Mohil On Fri, Jun 26, 2020 at 11:13 AM Mohil Khare wrote: > Sure not a problem. I will open one. > > Thanks > Mohil >

Re: Caching data inside DoFn

2020-06-26 Thread Praveen K Viswanathan
Thank you Luke. I will work on implementing my use case with Stateful ParDo itself and come back if I have any questions. Appreciate your help. On Fri, Jun 26, 2020 at 8:14 AM Luke Cwik wrote: > Use a stateful DoFn and buffer the elements in a bag state. You'll want to > use a key that contains

Apply Wait.on() pattern after AvroIO, KinesisIO writes

2020-06-26 Thread Sunny, Mani Kolbe
Hello, I am looking to implement Wait.on() pattern to do something after writes for each window are done. There are two outputs - one writing using AvroIO and other using KinesisIO. The problem is writes don't return PCollection which is required to construct Wait.On(). Is there a way around?