Yes a very high and non deterministic cardinality can make the stored state of join operation unbounded. In my case we know the cardinality and it was not very high so we could go with a lookup based approach using redis to enrich the stream and avoid joins.
On Wed, Aug 30, 2023 at 5:04 AM Ruben Vargas <ruben.var...@metova.com> wrote: > Thanks for the reply and the advice > > One more thing, Do you know if the key-space carnality impacts on this? > I'm assuming it is, but the thing is for my case all the messages from the > sources has a unique ID, that makes my key-space huge and is not on my > control . > > On Tue, Aug 29, 2023 at 9:29 AM Sachin Mittal <sjmit...@gmail.com> wrote: > >> So for the smaller size of collection which does not grow with size for >> certain keys we stored the data in redis and instead of beam join in our >> DoFn we just did the lookup and got the data we need. >> >> >> On Tue, 29 Aug 2023 at 8:50 PM, Ruben Vargas <ruben.var...@metova.com> >> wrote: >> >>> Hello, >>> >>> Thanks for the reply, Any strategy you followed to avoid joins when you >>> rewrite your pipeline? >>> >>> >>> >>> On Tue, Aug 29, 2023 at 9:15 AM Sachin Mittal <sjmit...@gmail.com> >>> wrote: >>> >>>> Yes even we faced the same issue when trying to run a pipeline >>>> involving join of two collections. It was deployed using AWS KDA, which >>>> uses flink runner. The source was kinesis streams. >>>> >>>> Looks like join operations are not very efficient in terms of size >>>> management when run on flink. >>>> >>>> We had to rewrite our pipeline to avoid these joins. >>>> >>>> Thanks >>>> Sachin >>>> >>>> >>>> On Tue, 29 Aug 2023 at 7:00 PM, Ruben Vargas <ruben.var...@metova.com> >>>> wrote: >>>> >>>>> Hello >>>>> >>>>> I experimenting an issue with my beam pipeline >>>>> >>>>> I have a pipeline in which I split the work into different branches, >>>>> then I do a join using CoGroupByKey, each message has its own unique Key. >>>>> >>>>> For the Join, I used a Session Window, and discarding the messages >>>>> after trigger. >>>>> >>>>> I'm using Flink Runner and deployed a KInesis application. But I'm >>>>> experiencing an unbounded growth of the checkpoint data size. When I see >>>>> in Flink console, the following task has the largest checkpoint >>>>> >>>>> join_results/GBK -> ToGBKResult -> >>>>> join_results/ConstructCoGbkResultFn/ParMultiDo(ConstructCoGbkResult) -> V >>>>> >>>>> >>>>> Any Advice ? >>>>> >>>>> Thank you very much! >>>>> >>>>>