Hi Till,

How would we do the initial hydration of the Product and Account data since
it’s currently in a relational DB? Do we have to copy over data to Kafka
and then use them?

Regards,
Harsh

On Tue, Jul 24, 2018 at 09:22 Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Harshvardhan,
>
> I agree with Ankit that this problem could actually be solved quite
> elegantly with Flink's state. If you can ingest the product/account
> information changes as a stream, you can keep the latest version of it in
> Flink state by using a co-map function [1, 2]. One input of the co-map
> function would be the product/account update stream which updates the
> respective entries in Flink's state and the other input stream is the one
> to be enriched. When receiving input from this stream one would lookup the
> latest information contained in the operator's state and join it with the
> incoming event.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.html
>
> Cheers,
> Till
>
> On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal <
> harshvardhan.ag...@gmail.com> wrote:
>
>> Hi,
>>
>> Thanks for your responses.
>>
>> There is no fixed interval for the data being updated. It’s more like
>> whenever you onboard a new product or there are any mandates that change
>> will trigger the reference data to change.
>>
>> It’s not just the enrichment we are doing here. Once we have enriched the
>> data we will be performing a bunch of aggregations using the enriched data.
>>
>> Which approach would you recommend?
>>
>> Regards,
>> Harshvardhan
>>
>> On Tue, Jul 24, 2018 at 04:04 Jain, Ankit <ankit.j...@here.com> wrote:
>>
>>> How often is the product db updated? Based on that you can store product
>>> metadata as state in Flink, maybe setup the state on cluster startup and
>>> then update daily etc.
>>>
>>>
>>>
>>> Also, just based on this feature, flink doesn’t seem to add a lot of
>>> value on top of Kafka. As Jorn said below, you can very well store all the
>>> events in an external store and then periodically run a cron to enrich
>>> later since your processing doesn’t seem to require absolute real time.
>>>
>>>
>>>
>>> Thanks
>>>
>>> Ankit
>>>
>>>
>>>
>>> *From: *Jörn Franke <jornfra...@gmail.com>
>>> *Date: *Monday, July 23, 2018 at 10:10 PM
>>> *To: *Harshvardhan Agrawal <harshvardhan.ag...@gmail.com>
>>> *Cc: *<user@flink.apache.org>
>>> *Subject: *Re: Implement Joins with Lookup Data
>>>
>>>
>>>
>>> For the first one (lookup of single entries) you could use a NoSQL db
>>> (eg key value store) - a relational database will not scale.
>>>
>>>
>>>
>>> Depending on when you need to do the enrichment you could also first
>>> store the data and enrich it later as part of a batch process.
>>>
>>>
>>> On 24. Jul 2018, at 05:25, Harshvardhan Agrawal <
>>> harshvardhan.ag...@gmail.com> wrote:
>>>
>>> Hi,
>>>
>>>
>>>
>>> We are using Flink for financial data enrichment and aggregations. We
>>> have Positions data that we are currently receiving from Kafka. We want to
>>> enrich that data with reference data like Product and Account information
>>> that is present in a relational database. From my understanding of Flink so
>>> far I think there are two ways to achieve this. Here are two ways to do it:
>>>
>>>
>>>
>>> 1) First Approach:
>>>
>>> a) Get positions from Kafka and key by product key.
>>>
>>> b) Perform lookup from the database for each key and then obtain
>>> Tuple2<Position, Product>
>>>
>>>
>>>
>>> 2) Second Approach:
>>>
>>> a) Get positions from Kafka and key by product key.
>>>
>>> b) Window the keyed stream into say 15 seconds each.
>>>
>>> c) For each window get the unique product keys and perform a single
>>> lookup.
>>>
>>> d) Somehow join Positions and Products
>>>
>>>
>>>
>>> In the first approach we will be making a lot of calls to the DB and the
>>> solution is very chatty. Its hard to scale this cos the database storing
>>> the reference data might not be very responsive.
>>>
>>>
>>>
>>> In the second approach, I wish to join the WindowedStream with the
>>> SingleOutputStream and turns out I can't join a windowed stream. So I am
>>> not quite sure how to do that.
>>>
>>>
>>>
>>> I wanted an opinion for what is the right thing to do. Should I go with
>>> the first approach or the second one. If the second one, how can I
>>> implement the join?
>>>
>>>
>>>
>>> --
>>>
>>>
>>> *Regards, Harshvardhan Agrawal*
>>>
>>> --
>> Regards,
>> Harshvardhan
>>
> --
Regards,
Harshvardhan

Reply via email to