Hi Harsh,

> What I don't get is, how would this work when I have more than 2 datasets
involved?
If you can ingest the product/account/rate information changes as a stream,
I think there are two ways to enrich the positions.

   - One way is connect multi times. Positions connect Account connect
   Product connect Rate.
   - Also, we can unify the schema and union Account,Product and Rate
   before connect. Positions connect (Account union Product union Rate).

Best, Hequn

On Thu, Aug 23, 2018 at 9:46 AM Harshvardhan Agrawal <
harshvardhan.ag...@gmail.com> wrote:

> Hi Hequn,
>
> We considered that but unfortunately we have a lot of reference data and
> we would need enormous amount of memory to hold the data. As a proof of
> concept I had added a Guava cache and that did improve performance but then
> it can't hold all of our reference data. We have a lot of use cases where
> we want to join position data with Account, Product, Exchange Rate, etc.
> The joins can easily be across several datasets in order to obtain the
> final enriched information.
> Now if I were to keep an external cache say something like Ignite, I would
> need the some service that constantly keeps hitting the cache for every
> position which makes the pipeline super chatty. Hence we thought going with
> the windowing approach would help us control that chattiness.
>
> I like Till's solution of connecting streams and using CoFlatMap. I can
> also see an example on Data Artisan's website (
> http://training.data-artisans.com/exercises/eventTimeJoin.html#). What I
> don't get is, how would this work when I have more than 2 datasets
> involved. In my case say I wanted to enrich Positions using Account,
> Product and Exchange Rate datasets.
>
> Regards,
> Harsh
>
> On Sun, Aug 19, 2018 at 10:22 PM Hequn Cheng <chenghe...@gmail.com> wrote:
>
>> Hi Harshvardhan,
>>
>> Have you ever consider adding a cache when lookup from the database, so
>> that we don't have to add so many pipelines, also don't have to do window
>> distinct.
>> The cache can be a LRU cache with size and expire time specified.
>> If your data is limited it can also be an All data cache. The All data
>> cache can be updated, say each 2h, according to our requirement.
>>
>> Adding a cache can not only simplify your pipeline but also improve the
>> job performance.
>>
>> Best, Hequn
>>
>>
>> On Mon, Aug 20, 2018 at 5:42 AM, Harshvardhan Agrawal <
>> harshvardhan.ag...@gmail.com> wrote:
>>
>>> Hello Everyone,
>>>
>>> Sorry for the delayed response.
>>> This is what I am thinking of doing. We are thinking of creating 2
>>> pipelines. The first one only enriches the position data with product
>>> information. The second pipeline will use the enriched position and get all
>>> the account information for performing aggregations.
>>>
>>> *First Pipeline*:
>>> 1) Get the positions from Kafka and window data into tumbling windows of
>>> 30 seconds.
>>> 2) We perform a rolling aggregation that basically collects all the
>>> unique product keys in a set.
>>> 3) At the end of the window, we have a process function that queries an
>>> external service that performs a single lookup for all the unique products
>>> we have seen in the window.
>>> 4) Persist the enriched positions to Kafka topic T1. There is a sink
>>> process that reads from this Kafka topic (T1), writes to an underlying DB
>>> and persist to another Kafka topic (T2)  for the pipeline to read from.
>>>
>>> *Second Pipeline*
>>> 1) Reads from topic T2 that contains enriched position.
>>> 2) For each position, we get the account information and lookup all the
>>> parent and child accounts associated with that account.
>>> 3) Once we have all the accounts, we lookup all the enriched positions
>>> that were created from the first pipeline for those accounts.
>>> 4) We perform the final aggregation to say calculate the Net Asset Value
>>> for the account.
>>> 5) Persist the output to the DB.
>>>
>>> Regards,
>>> Harsh
>>>
>>> On Wed, Jul 25, 2018 at 6:52 PM ashish pok <ashish...@yahoo.com> wrote:
>>>
>>>> Hi Michael,
>>>>
>>>> We are currently using 15 TMs with 4 cores and 4 slots each, 10GB of
>>>> memory on each TM. We have 15 partitions on Kafka for stream and 6 for
>>>> context/smaller stream. Heap is around 50%, GC is about 150ms and CPU loads
>>>> are low. We may be able to reduce resources on this if need be.
>>>>
>>>> Thanks,
>>>>
>>>>
>>>> - Ashish
>>>>
>>>> On Wednesday, July 25, 2018, 4:07 AM, Michael Gendelman <
>>>> gen...@gmail.com> wrote:
>>>>
>>>> Hi Ashish,
>>>>
>>>> We are planning for a similar use case and I was wondering if you can
>>>> share the amount of resources you have allocated for this flow?
>>>>
>>>> Thanks,
>>>> Michael
>>>>
>>>> On Tue, Jul 24, 2018, 18:57 ashish pok <ashish...@yahoo.com> wrote:
>>>>
>>>> BTW,
>>>>
>>>> We got around bootstrap problem for similar use case using a “nohup”
>>>> topic as input stream. Our CICD pipeline currently passes an initialize
>>>> option to app IF there is a need to bootstrap and waits for X minutes
>>>> before taking a savepoint and restart app normally listening to right
>>>> topic(s). I believe there is work underway to handle this gracefully using
>>>> Side Input as well. Other than determining X minutes for initialization to
>>>> complete, we havent had any issue with this solution - we have over 40
>>>> million states refreshes daily and close to 200Mbps input streams being
>>>> joined to states.
>>>>
>>>> Hope this helps!
>>>>
>>>>
>>>>
>>>> - Ashish
>>>>
>>>> On Tuesday, July 24, 2018, 11:37 AM, Elias Levy <
>>>> fearsome.lucid...@gmail.com> wrote:
>>>>
>>>> Alas, this suffer from the bootstrap problem.  At the moment Flink does
>>>> not allow you to pause a source (the positions), so you can't fully consume
>>>> the and preload the accounts or products to perform the join before the
>>>> positions start flowing.  Additionally, Flink SQL does not support
>>>> materializing an upset table for the accounts or products to perform the
>>>> join, so yo have to develop your own KeyedProcessFunction, maintain the
>>>> state, and perform the join on your own if you only want to join against
>>>> the latest value for each key.
>>>>
>>>> On Tue, Jul 24, 2018 at 7:27 AM Till Rohrmann <trohrm...@apache.org>
>>>> wrote:
>>>>
>>>> Yes, using Kafka which you initialize with the initial values and then
>>>> feed changes to the Kafka topic from which you consume could be a solution.
>>>>
>>>> On Tue, Jul 24, 2018 at 3:58 PM Harshvardhan Agrawal <
>>>> harshvardhan.ag...@gmail.com> wrote:
>>>>
>>>> Hi Till,
>>>>
>>>> How would we do the initial hydration of the Product and Account data
>>>> since it’s currently in a relational DB? Do we have to copy over data to
>>>> Kafka and then use them?
>>>>
>>>> Regards,
>>>> Harsh
>>>>
>>>> On Tue, Jul 24, 2018 at 09:22 Till Rohrmann <trohrm...@apache.org>
>>>> wrote:
>>>>
>>>> Hi Harshvardhan,
>>>>
>>>> I agree with Ankit that this problem could actually be solved quite
>>>> elegantly with Flink's state. If you can ingest the product/account
>>>> information changes as a stream, you can keep the latest version of it in
>>>> Flink state by using a co-map function [1, 2]. One input of the co-map
>>>> function would be the product/account update stream which updates the
>>>> respective entries in Flink's state and the other input stream is the one
>>>> to be enriched. When receiving input from this stream one would lookup the
>>>> latest information contained in the operator's state and join it with the
>>>> incoming event.
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/
>>>> [2]
>>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.html
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal <
>>>> harshvardhan.ag...@gmail.com> wrote:
>>>>
>>>> Hi,
>>>>
>>>> Thanks for your responses.
>>>>
>>>> There is no fixed interval for the data being updated. It’s more like
>>>> whenever you onboard a new product or there are any mandates that change
>>>> will trigger the reference data to change.
>>>>
>>>> It’s not just the enrichment we are doing here. Once we have enriched
>>>> the data we will be performing a bunch of aggregations using the enriched
>>>> data.
>>>>
>>>> Which approach would you recommend?
>>>>
>>>> Regards,
>>>> Harshvardhan
>>>>
>>>> On Tue, Jul 24, 2018 at 04:04 Jain, Ankit <ankit.j...@here.com> wrote:
>>>>
>>>> How often is the product db updated? Based on that you can store
>>>> product metadata as state in Flink, maybe setup the state on cluster
>>>> startup and then update daily etc.
>>>>
>>>>
>>>>
>>>> Also, just based on this feature, flink doesn’t seem to add a lot of
>>>> value on top of Kafka. As Jorn said below, you can very well store all the
>>>> events in an external store and then periodically run a cron to enrich
>>>> later since your processing doesn’t seem to require absolute real time.
>>>>
>>>>
>>>>
>>>> Thanks
>>>>
>>>> Ankit
>>>>
>>>>
>>>>
>>>> *From: *Jörn Franke <jornfra...@gmail.com>
>>>> *Date: *Monday, July 23, 2018 at 10:10 PM
>>>> *To: *Harshvardhan Agrawal <harshvardhan.ag...@gmail.com>
>>>> *Cc: *<user@flink.apache.org>
>>>> *Subject: *Re: Implement Joins with Lookup Data
>>>>
>>>>
>>>>
>>>> For the first one (lookup of single entries) you could use a NoSQL db
>>>> (eg key value store) - a relational database will not scale.
>>>>
>>>>
>>>>
>>>> Depending on when you need to do the enrichment you could also first
>>>> store the data and enrich it later as part of a batch process.
>>>>
>>>>
>>>> On 24. Jul 2018, at 05:25, Harshvardhan Agrawal <
>>>> harshvardhan.ag...@gmail.com> wrote:
>>>>
>>>> Hi,
>>>>
>>>>
>>>>
>>>> We are using Flink for financial data enrichment and aggregations. We
>>>> have Positions data that we are currently receiving from Kafka. We want to
>>>> enrich that data with reference data like Product and Account information
>>>> that is present in a relational database. From my understanding of Flink so
>>>> far I think there are two ways to achieve this. Here are two ways to do it:
>>>>
>>>>
>>>>
>>>> 1) First Approach:
>>>>
>>>> a) Get positions from Kafka and key by product key.
>>>>
>>>> b) Perform lookup from the database for each key and then obtain
>>>> Tuple2<Position, Product>
>>>>
>>>>
>>>>
>>>> 2) Second Approach:
>>>>
>>>> a) Get positions from Kafka and key by product key.
>>>>
>>>> b) Window the keyed stream into say 15 seconds each.
>>>>
>>>> c) For each window get the unique product keys and perform a single
>>>> lookup.
>>>>
>>>> d) Somehow join Positions and Products
>>>>
>>>>
>>>>
>>>> In the first approach we will be making a lot of calls to the DB and
>>>> the solution is very chatty. Its hard to scale this cos the database
>>>> storing the reference data might not be very responsive.
>>>>
>>>>
>>>>
>>>> In the second approach, I wish to join the WindowedStream with the
>>>> SingleOutputStream and turns out I can't join a windowed stream. So I am
>>>> not quite sure how to do that.
>>>>
>>>>
>>>>
>>>> I wanted an opinion for what is the right thing to do. Should I go with
>>>> the first approach or the second one. If the second one, how can I
>>>> implement the join?
>>>>
>>>>
>>>>
>>>> --
>>>>
>>>>
>>>> *Regards, Harshvardhan Agrawal*
>>>>
>>>> --
>>>> Regards,
>>>> Harshvardhan
>>>>
>>>> --
>>>> Regards,
>>>> Harshvardhan
>>>>
>>>>
>>>
>>>
>>
>
> --
>
>
> *Regards,Harshvardhan Agrawal*
>

Reply via email to