[question]Best practices for branching pipeline.

2023-07-21 Thread Ruben Vargas
Hello, I'm starting using Beam and I would like to know if there is any recommended pattern for doing the following: I have a message coming from Kafka and then I would like to apply two different transformations and merge them in a single result at the end. I attached an image that describes the

Re: [question]Best practices for branching pipeline.

2023-07-26 Thread Ruben Vargas
Hello? Any advice on how to do what I described? I can only found examples of bounded data. Not for streaming. Aldo can I get invited to slack? Thank you very much! El El vie, 21 de julio de 2023 a la(s) 9:34, Ruben Vargas < ruben.var...@metova.com> escribió: > Hello, > > I&#

Re: [question]Best practices for branching pipeline.

2023-08-10 Thread Ruben Vargas
gt; @ProcessElement > public void processElement(@Element KafkaRecord record, > OutputReciever receiver) { > Type1 part1 = something(record); > Type2 part2 = somethingElse(record; > MergedType merged = merge(part1, part2); > receiver.output(merged) > } > > } &g

Issue with growing state/checkpoint size

2023-08-29 Thread Ruben Vargas
Hello I experimenting an issue with my beam pipeline I have a pipeline in which I split the work into different branches, then I do a join using CoGroupByKey, each message has its own unique Key. For the Join, I used a Session Window, and discarding the messages after trigger. I'm using Flink R

Re: Issue with growing state/checkpoint size

2023-08-29 Thread Ruben Vargas
WS KDA, which uses flink > runner. The source was kinesis streams. > > Looks like join operations are not very efficient in terms of size > management when run on flink. > > We had to rewrite our pipeline to avoid these joins. > > Thanks > Sachin > > > On Tue, 29

Re: Issue with growing state/checkpoint size

2023-08-29 Thread Ruben Vargas
29 AM Sachin Mittal wrote: > So for the smaller size of collection which does not grow with size for > certain keys we stored the data in redis and instead of beam join in our > DoFn we just did the lookup and got the data we need. > > > On Tue, 29 Aug 2023 at 8:50 PM, Ruben Varg

Re: Issue with growing state/checkpoint size

2023-09-01 Thread Ruben Vargas
f join operation unbounded. > In my case we know the cardinality and it was not very high so we could go > with a lookup based approach using redis to enrich the stream and avoid > joins. > > > > On Wed, Aug 30, 2023 at 5:04 AM Ruben Vargas > wrote: > >> Thanks for

[Question] Side Input pattern

2023-09-12 Thread Ruben Vargas
Hello Everyone I have a question, I have on my pipeline one side input that fetches some configurations from an API endpoint each 30 seconds, my question is this. I have something similar to what is showed in the side input patterns documentation PCollectionView> map = p.apply(Generate

Re: [Question] Side Input pattern

2023-09-15 Thread Ruben Vargas
(for performance > reasons) but may periodically re-fetch it (the exact cadence probably > depends on the runner implementation). > > On Tue, Sep 12, 2023 at 10:34 PM Ruben Vargas > wrote: > >> Hello Everyone >> >> I have a question, I have on my pipeline one s

DLQ Implementation

2024-03-27 Thread Ruben Vargas
Hello all Maybe a silly question. Are there any suggestions for implementing a DLQ in my beam pipeline? Currently I'm using this library https://github.com/tosun-si/asgarde which is not bad, the only issue I found is that sometimes it is hard to use with GroupIntoBatches or other native transf

KV with AutoValueSchema

2024-04-04 Thread Ruben Vargas
Hello guys I have a question, is it possible to use KV along with AutoValueSchema objects? I'm having troubles when I try to use it together. I have an object like the following @AutoValue @DefaultSchema(AutoValueSchema.class) public abstract class SharedCoreEvent { @JsonProperty("subscriptionI

Re: KV with AutoValueSchema

2024-04-04 Thread Ruben Vargas
e a previous PCollection of type > SharedCoreEvent, or is the SharedCoreEvent created in ProcessEvents? > > On Thu, Apr 4, 2024 at 2:12 PM Ruben Vargas > wrote: > >> Hello guys >> >> I have a question, is it possible to use KV along with AutoValueSchema >>

How to handle Inheritance with AutoValueSchema

2024-04-08 Thread Ruben Vargas
Hello Guys I have a PCollection with a "Session" object, inside that object I have a list of events For each event, I have different types, EventA, EventB, EventC and so on.. all of them extend from Event, which will contain common fields. According to the AutoValue documentation, inheritance fr

Re: KV with AutoValueSchema

2024-04-09 Thread Ruben Vargas
ypeDescriptors.longs())) > .setCoder(KvCoder.of(LongCoder.of(), coreEvents.getCoder())) > .apply(Reshuffle.of()) > ... etc. > > > On Thu, Apr 4, 2024 at 8:19 PM Ruben Vargas wrote: >> >> ProcessEvents receive as an input a Session object and

Any recomendation for key for GroupIntoBatches

2024-04-12 Thread Ruben Vargas
Hello guys Maybe this question was already answered, but I cannot find it and want some more input on this topic. I have some messages that don't have any particular key candidate, except the ID, but I don't want to use it because the idea is to group multiple IDs in the same batch. This is my

Re: Any recomendation for key for GroupIntoBatches

2024-04-12 Thread Ruben Vargas
d to potential loss of data. That is why the state is used or at least that is my understanding. but maybe there is a way to do this in the state? >> On Fri, Apr 12, 2024 at 21.23 Ruben Vargas wrote: >>> >>> Hello guys >>> >>> Maybe this question was alrea

Re: Any recomendation for key for GroupIntoBatches

2024-04-15 Thread Ruben Vargas
t on top of > https://beam.apache.org/documentation/io/built-in/webapis/ > > On Fri, Apr 12, 2024 at 4:38 PM Ruben Vargas wrote: >> >> On Fri, Apr 12, 2024 at 2:17 PM Jaehyeon Kim wrote: >> > >> > Here is an example from a book that I'm reading n

Re: Any recomendation for key for GroupIntoBatches

2024-04-15 Thread Ruben Vargas
sform to handle request retries and exponential backoff > optional caching of request and response associations > optional metrics > > > On Mon, Apr 15, 2024 at 10:38 AM Ruben Vargas wrote: >> >> That one looks interesting >> >> What is not clear to me is wha

Re: Any recomendation for key for GroupIntoBatches

2024-04-15 Thread Ruben Vargas
Is there a way to do batching in that transformation? I'm assuming for now no. or may be using in conjuntion with GoupIntoBatches On Mon, Apr 15, 2024 at 9:29 AM Ruben Vargas wrote: > > Interesting > > I think the cache feature could be interesting for some use cases I have. &

Re: Any recomendation for key for GroupIntoBatches

2024-04-15 Thread Ruben Vargas
: > > I am not sure you still need to do batching since Web API can handle caching. > > If you really need it, I think GoupIntoBatches is a good way to go. > > On Mon, Apr 15, 2024 at 11:30 AM Ruben Vargas wrote: >> >> Is there a way to do batching in that transformation?

Paralalelism of a side input

2024-06-07 Thread Ruben Vargas
Hello guys One question, I have a side input which fetches an endpoint each 30 min, I pretty much copied the example here: https://beam.apache.org/documentation/patterns/side-inputs/ but added some logic to fetch the endpoint and parse the payload. My question is: it is possible to control the pa

Re: Paralalelism of a side input

2024-06-12 Thread Ruben Vargas
Even if the operator parallelism for > that step is technically, say, eight, your effective parallelism will > be exactly one. > > [1] > https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/Reshuffle.html > > On Fri, Jun 7, 2024 at 2:13 PM Ruben Vargas > wr

How windowing is implemented on Flink runner

2024-06-12 Thread Ruben Vargas
Hello guys May be a silly question, But in the Flink runner, the window implementation uses the Flink windowing? Does that mean the runner will have performance issues like Flink itself? see this: https://issues.apache.org/jira/browse/FLINK-7001 I'm asking because I see the issue, it mentions di

Re: How windowing is implemented on Flink runner

2024-06-12 Thread Ruben Vargas
I imagined it but wasn't sure! Thanks for the clarification! On Wed, Jun 12, 2024 at 1:42 PM Robert Bradshaw via user wrote: > > Beam implements Windowing itself (via state and timers) rather than > deferring to Flink's implementation. > > On Wed, Jun 12, 2024 at 11:

Re: Exactly once KafkaIO with flink runner

2024-06-18 Thread Ruben Vargas
unaligned checkpoints? What is the > checkpointing interval and the volume of data coming in from the source? > With EOS data is committed after checkpoint, before that, the data is > buffered in state, which makes the sink more resource intensive. > > Jan > > On 6/18/24

Re: Exactly once KafkaIO with flink runner

2024-06-18 Thread Ruben Vargas
https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html#withEOS-int-java.lang.String- > > On 6/18/24 09:32, Ruben Vargas wrote: > > Hello Lukavsky > > Thanks for your reply ! > > I thought was due backpreassure but i increased the

Re: Exactly once KafkaIO with flink runner

2024-06-18 Thread Ruben Vargas
his up! On Tue, Jun 18, 2024 at 8:44 AM Ruben Vargas wrote: > > Hello Jan > > Thanks for the suggestions > > Any benefit of using aligned vs unaligned? > > > At the end I found one problem that was preventing flink from doing > the checkpointing. It was a DoFn functi

Re: Exactly once KafkaIO with flink runner

2024-06-19 Thread Ruben Vargas
s). > > Jan > > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/ > > On 6/19/24 05:15, Ruben Vargas wrote: > > Hello guys > > > > Now I was able to pass that error. > > > > I had to set the cons

Re: Paralalelism of a side input

2024-06-20 Thread Ruben Vargas
Only bad thing for this approach is, at least in the flink runner it consume a task slot :( El El mié, 12 de jun de 2024 a la(s) 9:38 a.m., Robert Bradshaw < rober...@google.com> escribió: > On Wed, Jun 12, 2024 at 7:56 AM Ruben Vargas > wrote: > > > > The approach look

Re: Exactly once KafkaIO with flink runner

2024-06-20 Thread Ruben Vargas
! Regards. On Wed, Jun 19, 2024 at 11:09 PM Ruben Vargas wrote: > > Hello again > > Thank you for all the suggestions. > > Unfortunately if I put more shards than partitions it throws me this exception > > "message": > "PipelineBuilder-debug

Re: Exactly once KafkaIO with flink runner

2024-06-20 Thread Ruben Vargas
Image as not correctly attached. sending it again. Sorry Thanks On Thu, Jun 20, 2024 at 9:25 PM Ruben Vargas wrote: > > Hello guys, me again > > I was trying to debug the issue with the backpressure and I noticed > that even if I set the shards = 16, not all tasks are recei

Re: Exactly once KafkaIO with flink runner

2024-06-24 Thread Ruben Vargas
eam.sdk.util.UserCodeException: java.lang.RuntimeException: java.lang.reflect.InvocationTargetException\n\tat Do I need to move any configuration to do that? Thanks > > b) increase maxParallelism (default 128, maximum 32768), as it might > influence the assignment of keys to downstream w

How Kafka reader works

2024-07-08 Thread Ruben Vargas
Hello guys I'm struggling to understand how the KafkaIO reader works regarding the initial offset. I enabled *commitOffsetsInFinalize* which I understand will commit the offset to Kafka after finishing each checkpoint. My question is. If I'm using a stable *group.id * and for so

State corrupted when update

2024-07-08 Thread Ruben Vargas
Hello guys I'm using the flink runner. I had to update my application in order to fix some minor bugs, (incorrect assignment on the output fields) Then when I tried to update my pipeline with the latest code I found this error: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeExcept