Hi Harsh, > What I don't get is, how would this work when I have more than 2 datasets involved? If you can ingest the product/account/rate information changes as a stream, I think there are two ways to enrich the positions.
- One way is connect multi times. Positions connect Account connect Product connect Rate. - Also, we can unify the schema and union Account,Product and Rate before connect. Positions connect (Account union Product union Rate). Best, Hequn On Thu, Aug 23, 2018 at 9:46 AM Harshvardhan Agrawal < harshvardhan.ag...@gmail.com> wrote: > Hi Hequn, > > We considered that but unfortunately we have a lot of reference data and > we would need enormous amount of memory to hold the data. As a proof of > concept I had added a Guava cache and that did improve performance but then > it can't hold all of our reference data. We have a lot of use cases where > we want to join position data with Account, Product, Exchange Rate, etc. > The joins can easily be across several datasets in order to obtain the > final enriched information. > Now if I were to keep an external cache say something like Ignite, I would > need the some service that constantly keeps hitting the cache for every > position which makes the pipeline super chatty. Hence we thought going with > the windowing approach would help us control that chattiness. > > I like Till's solution of connecting streams and using CoFlatMap. I can > also see an example on Data Artisan's website ( > http://training.data-artisans.com/exercises/eventTimeJoin.html#). What I > don't get is, how would this work when I have more than 2 datasets > involved. In my case say I wanted to enrich Positions using Account, > Product and Exchange Rate datasets. > > Regards, > Harsh > > On Sun, Aug 19, 2018 at 10:22 PM Hequn Cheng <chenghe...@gmail.com> wrote: > >> Hi Harshvardhan, >> >> Have you ever consider adding a cache when lookup from the database, so >> that we don't have to add so many pipelines, also don't have to do window >> distinct. >> The cache can be a LRU cache with size and expire time specified. >> If your data is limited it can also be an All data cache. The All data >> cache can be updated, say each 2h, according to our requirement. >> >> Adding a cache can not only simplify your pipeline but also improve the >> job performance. >> >> Best, Hequn >> >> >> On Mon, Aug 20, 2018 at 5:42 AM, Harshvardhan Agrawal < >> harshvardhan.ag...@gmail.com> wrote: >> >>> Hello Everyone, >>> >>> Sorry for the delayed response. >>> This is what I am thinking of doing. We are thinking of creating 2 >>> pipelines. The first one only enriches the position data with product >>> information. The second pipeline will use the enriched position and get all >>> the account information for performing aggregations. >>> >>> *First Pipeline*: >>> 1) Get the positions from Kafka and window data into tumbling windows of >>> 30 seconds. >>> 2) We perform a rolling aggregation that basically collects all the >>> unique product keys in a set. >>> 3) At the end of the window, we have a process function that queries an >>> external service that performs a single lookup for all the unique products >>> we have seen in the window. >>> 4) Persist the enriched positions to Kafka topic T1. There is a sink >>> process that reads from this Kafka topic (T1), writes to an underlying DB >>> and persist to another Kafka topic (T2) for the pipeline to read from. >>> >>> *Second Pipeline* >>> 1) Reads from topic T2 that contains enriched position. >>> 2) For each position, we get the account information and lookup all the >>> parent and child accounts associated with that account. >>> 3) Once we have all the accounts, we lookup all the enriched positions >>> that were created from the first pipeline for those accounts. >>> 4) We perform the final aggregation to say calculate the Net Asset Value >>> for the account. >>> 5) Persist the output to the DB. >>> >>> Regards, >>> Harsh >>> >>> On Wed, Jul 25, 2018 at 6:52 PM ashish pok <ashish...@yahoo.com> wrote: >>> >>>> Hi Michael, >>>> >>>> We are currently using 15 TMs with 4 cores and 4 slots each, 10GB of >>>> memory on each TM. We have 15 partitions on Kafka for stream and 6 for >>>> context/smaller stream. Heap is around 50%, GC is about 150ms and CPU loads >>>> are low. We may be able to reduce resources on this if need be. >>>> >>>> Thanks, >>>> >>>> >>>> - Ashish >>>> >>>> On Wednesday, July 25, 2018, 4:07 AM, Michael Gendelman < >>>> gen...@gmail.com> wrote: >>>> >>>> Hi Ashish, >>>> >>>> We are planning for a similar use case and I was wondering if you can >>>> share the amount of resources you have allocated for this flow? >>>> >>>> Thanks, >>>> Michael >>>> >>>> On Tue, Jul 24, 2018, 18:57 ashish pok <ashish...@yahoo.com> wrote: >>>> >>>> BTW, >>>> >>>> We got around bootstrap problem for similar use case using a “nohup” >>>> topic as input stream. Our CICD pipeline currently passes an initialize >>>> option to app IF there is a need to bootstrap and waits for X minutes >>>> before taking a savepoint and restart app normally listening to right >>>> topic(s). I believe there is work underway to handle this gracefully using >>>> Side Input as well. Other than determining X minutes for initialization to >>>> complete, we havent had any issue with this solution - we have over 40 >>>> million states refreshes daily and close to 200Mbps input streams being >>>> joined to states. >>>> >>>> Hope this helps! >>>> >>>> >>>> >>>> - Ashish >>>> >>>> On Tuesday, July 24, 2018, 11:37 AM, Elias Levy < >>>> fearsome.lucid...@gmail.com> wrote: >>>> >>>> Alas, this suffer from the bootstrap problem. At the moment Flink does >>>> not allow you to pause a source (the positions), so you can't fully consume >>>> the and preload the accounts or products to perform the join before the >>>> positions start flowing. Additionally, Flink SQL does not support >>>> materializing an upset table for the accounts or products to perform the >>>> join, so yo have to develop your own KeyedProcessFunction, maintain the >>>> state, and perform the join on your own if you only want to join against >>>> the latest value for each key. >>>> >>>> On Tue, Jul 24, 2018 at 7:27 AM Till Rohrmann <trohrm...@apache.org> >>>> wrote: >>>> >>>> Yes, using Kafka which you initialize with the initial values and then >>>> feed changes to the Kafka topic from which you consume could be a solution. >>>> >>>> On Tue, Jul 24, 2018 at 3:58 PM Harshvardhan Agrawal < >>>> harshvardhan.ag...@gmail.com> wrote: >>>> >>>> Hi Till, >>>> >>>> How would we do the initial hydration of the Product and Account data >>>> since it’s currently in a relational DB? Do we have to copy over data to >>>> Kafka and then use them? >>>> >>>> Regards, >>>> Harsh >>>> >>>> On Tue, Jul 24, 2018 at 09:22 Till Rohrmann <trohrm...@apache.org> >>>> wrote: >>>> >>>> Hi Harshvardhan, >>>> >>>> I agree with Ankit that this problem could actually be solved quite >>>> elegantly with Flink's state. If you can ingest the product/account >>>> information changes as a stream, you can keep the latest version of it in >>>> Flink state by using a co-map function [1, 2]. One input of the co-map >>>> function would be the product/account update stream which updates the >>>> respective entries in Flink's state and the other input stream is the one >>>> to be enriched. When receiving input from this stream one would lookup the >>>> latest information contained in the operator's state and join it with the >>>> incoming event. >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/ >>>> [2] >>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.html >>>> >>>> Cheers, >>>> Till >>>> >>>> On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal < >>>> harshvardhan.ag...@gmail.com> wrote: >>>> >>>> Hi, >>>> >>>> Thanks for your responses. >>>> >>>> There is no fixed interval for the data being updated. It’s more like >>>> whenever you onboard a new product or there are any mandates that change >>>> will trigger the reference data to change. >>>> >>>> It’s not just the enrichment we are doing here. Once we have enriched >>>> the data we will be performing a bunch of aggregations using the enriched >>>> data. >>>> >>>> Which approach would you recommend? >>>> >>>> Regards, >>>> Harshvardhan >>>> >>>> On Tue, Jul 24, 2018 at 04:04 Jain, Ankit <ankit.j...@here.com> wrote: >>>> >>>> How often is the product db updated? Based on that you can store >>>> product metadata as state in Flink, maybe setup the state on cluster >>>> startup and then update daily etc. >>>> >>>> >>>> >>>> Also, just based on this feature, flink doesn’t seem to add a lot of >>>> value on top of Kafka. As Jorn said below, you can very well store all the >>>> events in an external store and then periodically run a cron to enrich >>>> later since your processing doesn’t seem to require absolute real time. >>>> >>>> >>>> >>>> Thanks >>>> >>>> Ankit >>>> >>>> >>>> >>>> *From: *Jörn Franke <jornfra...@gmail.com> >>>> *Date: *Monday, July 23, 2018 at 10:10 PM >>>> *To: *Harshvardhan Agrawal <harshvardhan.ag...@gmail.com> >>>> *Cc: *<user@flink.apache.org> >>>> *Subject: *Re: Implement Joins with Lookup Data >>>> >>>> >>>> >>>> For the first one (lookup of single entries) you could use a NoSQL db >>>> (eg key value store) - a relational database will not scale. >>>> >>>> >>>> >>>> Depending on when you need to do the enrichment you could also first >>>> store the data and enrich it later as part of a batch process. >>>> >>>> >>>> On 24. Jul 2018, at 05:25, Harshvardhan Agrawal < >>>> harshvardhan.ag...@gmail.com> wrote: >>>> >>>> Hi, >>>> >>>> >>>> >>>> We are using Flink for financial data enrichment and aggregations. We >>>> have Positions data that we are currently receiving from Kafka. We want to >>>> enrich that data with reference data like Product and Account information >>>> that is present in a relational database. From my understanding of Flink so >>>> far I think there are two ways to achieve this. Here are two ways to do it: >>>> >>>> >>>> >>>> 1) First Approach: >>>> >>>> a) Get positions from Kafka and key by product key. >>>> >>>> b) Perform lookup from the database for each key and then obtain >>>> Tuple2<Position, Product> >>>> >>>> >>>> >>>> 2) Second Approach: >>>> >>>> a) Get positions from Kafka and key by product key. >>>> >>>> b) Window the keyed stream into say 15 seconds each. >>>> >>>> c) For each window get the unique product keys and perform a single >>>> lookup. >>>> >>>> d) Somehow join Positions and Products >>>> >>>> >>>> >>>> In the first approach we will be making a lot of calls to the DB and >>>> the solution is very chatty. Its hard to scale this cos the database >>>> storing the reference data might not be very responsive. >>>> >>>> >>>> >>>> In the second approach, I wish to join the WindowedStream with the >>>> SingleOutputStream and turns out I can't join a windowed stream. So I am >>>> not quite sure how to do that. >>>> >>>> >>>> >>>> I wanted an opinion for what is the right thing to do. Should I go with >>>> the first approach or the second one. If the second one, how can I >>>> implement the join? >>>> >>>> >>>> >>>> -- >>>> >>>> >>>> *Regards, Harshvardhan Agrawal* >>>> >>>> -- >>>> Regards, >>>> Harshvardhan >>>> >>>> -- >>>> Regards, >>>> Harshvardhan >>>> >>>> >>> >>> >> > > -- > > > *Regards,Harshvardhan Agrawal* >