Yes, using Kafka which you initialize with the initial values and then feed changes to the Kafka topic from which you consume could be a solution.
On Tue, Jul 24, 2018 at 3:58 PM Harshvardhan Agrawal < harshvardhan.ag...@gmail.com> wrote: > Hi Till, > > How would we do the initial hydration of the Product and Account data > since it’s currently in a relational DB? Do we have to copy over data to > Kafka and then use them? > > Regards, > Harsh > > On Tue, Jul 24, 2018 at 09:22 Till Rohrmann <trohrm...@apache.org> wrote: > >> Hi Harshvardhan, >> >> I agree with Ankit that this problem could actually be solved quite >> elegantly with Flink's state. If you can ingest the product/account >> information changes as a stream, you can keep the latest version of it in >> Flink state by using a co-map function [1, 2]. One input of the co-map >> function would be the product/account update stream which updates the >> respective entries in Flink's state and the other input stream is the one >> to be enriched. When receiving input from this stream one would lookup the >> latest information contained in the operator's state and join it with the >> incoming event. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/ >> [2] >> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.html >> >> Cheers, >> Till >> >> On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal < >> harshvardhan.ag...@gmail.com> wrote: >> >>> Hi, >>> >>> Thanks for your responses. >>> >>> There is no fixed interval for the data being updated. It’s more like >>> whenever you onboard a new product or there are any mandates that change >>> will trigger the reference data to change. >>> >>> It’s not just the enrichment we are doing here. Once we have enriched >>> the data we will be performing a bunch of aggregations using the enriched >>> data. >>> >>> Which approach would you recommend? >>> >>> Regards, >>> Harshvardhan >>> >>> On Tue, Jul 24, 2018 at 04:04 Jain, Ankit <ankit.j...@here.com> wrote: >>> >>>> How often is the product db updated? Based on that you can store >>>> product metadata as state in Flink, maybe setup the state on cluster >>>> startup and then update daily etc. >>>> >>>> >>>> >>>> Also, just based on this feature, flink doesn’t seem to add a lot of >>>> value on top of Kafka. As Jorn said below, you can very well store all the >>>> events in an external store and then periodically run a cron to enrich >>>> later since your processing doesn’t seem to require absolute real time. >>>> >>>> >>>> >>>> Thanks >>>> >>>> Ankit >>>> >>>> >>>> >>>> *From: *Jörn Franke <jornfra...@gmail.com> >>>> *Date: *Monday, July 23, 2018 at 10:10 PM >>>> *To: *Harshvardhan Agrawal <harshvardhan.ag...@gmail.com> >>>> *Cc: *<user@flink.apache.org> >>>> *Subject: *Re: Implement Joins with Lookup Data >>>> >>>> >>>> >>>> For the first one (lookup of single entries) you could use a NoSQL db >>>> (eg key value store) - a relational database will not scale. >>>> >>>> >>>> >>>> Depending on when you need to do the enrichment you could also first >>>> store the data and enrich it later as part of a batch process. >>>> >>>> >>>> On 24. Jul 2018, at 05:25, Harshvardhan Agrawal < >>>> harshvardhan.ag...@gmail.com> wrote: >>>> >>>> Hi, >>>> >>>> >>>> >>>> We are using Flink for financial data enrichment and aggregations. We >>>> have Positions data that we are currently receiving from Kafka. We want to >>>> enrich that data with reference data like Product and Account information >>>> that is present in a relational database. From my understanding of Flink so >>>> far I think there are two ways to achieve this. Here are two ways to do it: >>>> >>>> >>>> >>>> 1) First Approach: >>>> >>>> a) Get positions from Kafka and key by product key. >>>> >>>> b) Perform lookup from the database for each key and then obtain >>>> Tuple2<Position, Product> >>>> >>>> >>>> >>>> 2) Second Approach: >>>> >>>> a) Get positions from Kafka and key by product key. >>>> >>>> b) Window the keyed stream into say 15 seconds each. >>>> >>>> c) For each window get the unique product keys and perform a single >>>> lookup. >>>> >>>> d) Somehow join Positions and Products >>>> >>>> >>>> >>>> In the first approach we will be making a lot of calls to the DB and >>>> the solution is very chatty. Its hard to scale this cos the database >>>> storing the reference data might not be very responsive. >>>> >>>> >>>> >>>> In the second approach, I wish to join the WindowedStream with the >>>> SingleOutputStream and turns out I can't join a windowed stream. So I am >>>> not quite sure how to do that. >>>> >>>> >>>> >>>> I wanted an opinion for what is the right thing to do. Should I go with >>>> the first approach or the second one. If the second one, how can I >>>> implement the join? >>>> >>>> >>>> >>>> -- >>>> >>>> >>>> *Regards, Harshvardhan Agrawal* >>>> >>>> -- >>> Regards, >>> Harshvardhan >>> >> -- > Regards, > Harshvardhan >