Hi Ashish, We are planning for a similar use case and I was wondering if you can share the amount of resources you have allocated for this flow?
Thanks, Michael On Tue, Jul 24, 2018, 18:57 ashish pok <ashish...@yahoo.com> wrote: > BTW, > > We got around bootstrap problem for similar use case using a “nohup” topic > as input stream. Our CICD pipeline currently passes an initialize option to > app IF there is a need to bootstrap and waits for X minutes before taking a > savepoint and restart app normally listening to right topic(s). I believe > there is work underway to handle this gracefully using Side Input as well. > Other than determining X minutes for initialization to complete, we havent > had any issue with this solution - we have over 40 million states refreshes > daily and close to 200Mbps input streams being joined to states. > > Hope this helps! > > > > - Ashish > > On Tuesday, July 24, 2018, 11:37 AM, Elias Levy < > fearsome.lucid...@gmail.com> wrote: > > Alas, this suffer from the bootstrap problem. At the moment Flink does > not allow you to pause a source (the positions), so you can't fully consume > the and preload the accounts or products to perform the join before the > positions start flowing. Additionally, Flink SQL does not support > materializing an upset table for the accounts or products to perform the > join, so yo have to develop your own KeyedProcessFunction, maintain the > state, and perform the join on your own if you only want to join against > the latest value for each key. > > On Tue, Jul 24, 2018 at 7:27 AM Till Rohrmann <trohrm...@apache.org> > wrote: > > Yes, using Kafka which you initialize with the initial values and then > feed changes to the Kafka topic from which you consume could be a solution. > > On Tue, Jul 24, 2018 at 3:58 PM Harshvardhan Agrawal < > harshvardhan.ag...@gmail.com> wrote: > > Hi Till, > > How would we do the initial hydration of the Product and Account data > since it’s currently in a relational DB? Do we have to copy over data to > Kafka and then use them? > > Regards, > Harsh > > On Tue, Jul 24, 2018 at 09:22 Till Rohrmann <trohrm...@apache.org> wrote: > > Hi Harshvardhan, > > I agree with Ankit that this problem could actually be solved quite > elegantly with Flink's state. If you can ingest the product/account > information changes as a stream, you can keep the latest version of it in > Flink state by using a co-map function [1, 2]. One input of the co-map > function would be the product/account update stream which updates the > respective entries in Flink's state and the other input stream is the one > to be enriched. When receiving input from this stream one would lookup the > latest information contained in the operator's state and join it with the > incoming event. > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/ > [2] > https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.html > > Cheers, > Till > > On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal < > harshvardhan.ag...@gmail.com> wrote: > > Hi, > > Thanks for your responses. > > There is no fixed interval for the data being updated. It’s more like > whenever you onboard a new product or there are any mandates that change > will trigger the reference data to change. > > It’s not just the enrichment we are doing here. Once we have enriched the > data we will be performing a bunch of aggregations using the enriched data. > > Which approach would you recommend? > > Regards, > Harshvardhan > > On Tue, Jul 24, 2018 at 04:04 Jain, Ankit <ankit.j...@here.com> wrote: > > How often is the product db updated? Based on that you can store product > metadata as state in Flink, maybe setup the state on cluster startup and > then update daily etc. > > > > Also, just based on this feature, flink doesn’t seem to add a lot of value > on top of Kafka. As Jorn said below, you can very well store all the events > in an external store and then periodically run a cron to enrich later since > your processing doesn’t seem to require absolute real time. > > > > Thanks > > Ankit > > > > *From: *Jörn Franke <jornfra...@gmail.com> > *Date: *Monday, July 23, 2018 at 10:10 PM > *To: *Harshvardhan Agrawal <harshvardhan.ag...@gmail.com> > *Cc: *<user@flink.apache.org> > *Subject: *Re: Implement Joins with Lookup Data > > > > For the first one (lookup of single entries) you could use a NoSQL db (eg > key value store) - a relational database will not scale. > > > > Depending on when you need to do the enrichment you could also first store > the data and enrich it later as part of a batch process. > > > On 24. Jul 2018, at 05:25, Harshvardhan Agrawal < > harshvardhan.ag...@gmail.com> wrote: > > Hi, > > > > We are using Flink for financial data enrichment and aggregations. We have > Positions data that we are currently receiving from Kafka. We want to > enrich that data with reference data like Product and Account information > that is present in a relational database. From my understanding of Flink so > far I think there are two ways to achieve this. Here are two ways to do it: > > > > 1) First Approach: > > a) Get positions from Kafka and key by product key. > > b) Perform lookup from the database for each key and then obtain > Tuple2<Position, Product> > > > > 2) Second Approach: > > a) Get positions from Kafka and key by product key. > > b) Window the keyed stream into say 15 seconds each. > > c) For each window get the unique product keys and perform a single lookup. > > d) Somehow join Positions and Products > > > > In the first approach we will be making a lot of calls to the DB and the > solution is very chatty. Its hard to scale this cos the database storing > the reference data might not be very responsive. > > > > In the second approach, I wish to join the WindowedStream with the > SingleOutputStream and turns out I can't join a windowed stream. So I am > not quite sure how to do that. > > > > I wanted an opinion for what is the right thing to do. Should I go with > the first approach or the second one. If the second one, how can I > implement the join? > > > > -- > > > *Regards, Harshvardhan Agrawal* > > -- > Regards, > Harshvardhan > > -- > Regards, > Harshvardhan > >