Hi Till, How would we do the initial hydration of the Product and Account data since it’s currently in a relational DB? Do we have to copy over data to Kafka and then use them?
Regards, Harsh On Tue, Jul 24, 2018 at 09:22 Till Rohrmann <trohrm...@apache.org> wrote: > Hi Harshvardhan, > > I agree with Ankit that this problem could actually be solved quite > elegantly with Flink's state. If you can ingest the product/account > information changes as a stream, you can keep the latest version of it in > Flink state by using a co-map function [1, 2]. One input of the co-map > function would be the product/account update stream which updates the > respective entries in Flink's state and the other input stream is the one > to be enriched. When receiving input from this stream one would lookup the > latest information contained in the operator's state and join it with the > incoming event. > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/ > [2] > https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.html > > Cheers, > Till > > On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal < > harshvardhan.ag...@gmail.com> wrote: > >> Hi, >> >> Thanks for your responses. >> >> There is no fixed interval for the data being updated. It’s more like >> whenever you onboard a new product or there are any mandates that change >> will trigger the reference data to change. >> >> It’s not just the enrichment we are doing here. Once we have enriched the >> data we will be performing a bunch of aggregations using the enriched data. >> >> Which approach would you recommend? >> >> Regards, >> Harshvardhan >> >> On Tue, Jul 24, 2018 at 04:04 Jain, Ankit <ankit.j...@here.com> wrote: >> >>> How often is the product db updated? Based on that you can store product >>> metadata as state in Flink, maybe setup the state on cluster startup and >>> then update daily etc. >>> >>> >>> >>> Also, just based on this feature, flink doesn’t seem to add a lot of >>> value on top of Kafka. As Jorn said below, you can very well store all the >>> events in an external store and then periodically run a cron to enrich >>> later since your processing doesn’t seem to require absolute real time. >>> >>> >>> >>> Thanks >>> >>> Ankit >>> >>> >>> >>> *From: *Jörn Franke <jornfra...@gmail.com> >>> *Date: *Monday, July 23, 2018 at 10:10 PM >>> *To: *Harshvardhan Agrawal <harshvardhan.ag...@gmail.com> >>> *Cc: *<user@flink.apache.org> >>> *Subject: *Re: Implement Joins with Lookup Data >>> >>> >>> >>> For the first one (lookup of single entries) you could use a NoSQL db >>> (eg key value store) - a relational database will not scale. >>> >>> >>> >>> Depending on when you need to do the enrichment you could also first >>> store the data and enrich it later as part of a batch process. >>> >>> >>> On 24. Jul 2018, at 05:25, Harshvardhan Agrawal < >>> harshvardhan.ag...@gmail.com> wrote: >>> >>> Hi, >>> >>> >>> >>> We are using Flink for financial data enrichment and aggregations. We >>> have Positions data that we are currently receiving from Kafka. We want to >>> enrich that data with reference data like Product and Account information >>> that is present in a relational database. From my understanding of Flink so >>> far I think there are two ways to achieve this. Here are two ways to do it: >>> >>> >>> >>> 1) First Approach: >>> >>> a) Get positions from Kafka and key by product key. >>> >>> b) Perform lookup from the database for each key and then obtain >>> Tuple2<Position, Product> >>> >>> >>> >>> 2) Second Approach: >>> >>> a) Get positions from Kafka and key by product key. >>> >>> b) Window the keyed stream into say 15 seconds each. >>> >>> c) For each window get the unique product keys and perform a single >>> lookup. >>> >>> d) Somehow join Positions and Products >>> >>> >>> >>> In the first approach we will be making a lot of calls to the DB and the >>> solution is very chatty. Its hard to scale this cos the database storing >>> the reference data might not be very responsive. >>> >>> >>> >>> In the second approach, I wish to join the WindowedStream with the >>> SingleOutputStream and turns out I can't join a windowed stream. So I am >>> not quite sure how to do that. >>> >>> >>> >>> I wanted an opinion for what is the right thing to do. Should I go with >>> the first approach or the second one. If the second one, how can I >>> implement the join? >>> >>> >>> >>> -- >>> >>> >>> *Regards, Harshvardhan Agrawal* >>> >>> -- >> Regards, >> Harshvardhan >> > -- Regards, Harshvardhan