Yes, using Kafka which you initialize with the initial values and then feed
changes to the Kafka topic from which you consume could be a solution.

On Tue, Jul 24, 2018 at 3:58 PM Harshvardhan Agrawal <
harshvardhan.ag...@gmail.com> wrote:

> Hi Till,
>
> How would we do the initial hydration of the Product and Account data
> since it’s currently in a relational DB? Do we have to copy over data to
> Kafka and then use them?
>
> Regards,
> Harsh
>
> On Tue, Jul 24, 2018 at 09:22 Till Rohrmann <trohrm...@apache.org> wrote:
>
>> Hi Harshvardhan,
>>
>> I agree with Ankit that this problem could actually be solved quite
>> elegantly with Flink's state. If you can ingest the product/account
>> information changes as a stream, you can keep the latest version of it in
>> Flink state by using a co-map function [1, 2]. One input of the co-map
>> function would be the product/account update stream which updates the
>> respective entries in Flink's state and the other input stream is the one
>> to be enriched. When receiving input from this stream one would lookup the
>> latest information contained in the operator's state and join it with the
>> incoming event.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.html
>>
>> Cheers,
>> Till
>>
>> On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal <
>> harshvardhan.ag...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Thanks for your responses.
>>>
>>> There is no fixed interval for the data being updated. It’s more like
>>> whenever you onboard a new product or there are any mandates that change
>>> will trigger the reference data to change.
>>>
>>> It’s not just the enrichment we are doing here. Once we have enriched
>>> the data we will be performing a bunch of aggregations using the enriched
>>> data.
>>>
>>> Which approach would you recommend?
>>>
>>> Regards,
>>> Harshvardhan
>>>
>>> On Tue, Jul 24, 2018 at 04:04 Jain, Ankit <ankit.j...@here.com> wrote:
>>>
>>>> How often is the product db updated? Based on that you can store
>>>> product metadata as state in Flink, maybe setup the state on cluster
>>>> startup and then update daily etc.
>>>>
>>>>
>>>>
>>>> Also, just based on this feature, flink doesn’t seem to add a lot of
>>>> value on top of Kafka. As Jorn said below, you can very well store all the
>>>> events in an external store and then periodically run a cron to enrich
>>>> later since your processing doesn’t seem to require absolute real time.
>>>>
>>>>
>>>>
>>>> Thanks
>>>>
>>>> Ankit
>>>>
>>>>
>>>>
>>>> *From: *Jörn Franke <jornfra...@gmail.com>
>>>> *Date: *Monday, July 23, 2018 at 10:10 PM
>>>> *To: *Harshvardhan Agrawal <harshvardhan.ag...@gmail.com>
>>>> *Cc: *<user@flink.apache.org>
>>>> *Subject: *Re: Implement Joins with Lookup Data
>>>>
>>>>
>>>>
>>>> For the first one (lookup of single entries) you could use a NoSQL db
>>>> (eg key value store) - a relational database will not scale.
>>>>
>>>>
>>>>
>>>> Depending on when you need to do the enrichment you could also first
>>>> store the data and enrich it later as part of a batch process.
>>>>
>>>>
>>>> On 24. Jul 2018, at 05:25, Harshvardhan Agrawal <
>>>> harshvardhan.ag...@gmail.com> wrote:
>>>>
>>>> Hi,
>>>>
>>>>
>>>>
>>>> We are using Flink for financial data enrichment and aggregations. We
>>>> have Positions data that we are currently receiving from Kafka. We want to
>>>> enrich that data with reference data like Product and Account information
>>>> that is present in a relational database. From my understanding of Flink so
>>>> far I think there are two ways to achieve this. Here are two ways to do it:
>>>>
>>>>
>>>>
>>>> 1) First Approach:
>>>>
>>>> a) Get positions from Kafka and key by product key.
>>>>
>>>> b) Perform lookup from the database for each key and then obtain
>>>> Tuple2<Position, Product>
>>>>
>>>>
>>>>
>>>> 2) Second Approach:
>>>>
>>>> a) Get positions from Kafka and key by product key.
>>>>
>>>> b) Window the keyed stream into say 15 seconds each.
>>>>
>>>> c) For each window get the unique product keys and perform a single
>>>> lookup.
>>>>
>>>> d) Somehow join Positions and Products
>>>>
>>>>
>>>>
>>>> In the first approach we will be making a lot of calls to the DB and
>>>> the solution is very chatty. Its hard to scale this cos the database
>>>> storing the reference data might not be very responsive.
>>>>
>>>>
>>>>
>>>> In the second approach, I wish to join the WindowedStream with the
>>>> SingleOutputStream and turns out I can't join a windowed stream. So I am
>>>> not quite sure how to do that.
>>>>
>>>>
>>>>
>>>> I wanted an opinion for what is the right thing to do. Should I go with
>>>> the first approach or the second one. If the second one, how can I
>>>> implement the join?
>>>>
>>>>
>>>>
>>>> --
>>>>
>>>>
>>>> *Regards, Harshvardhan Agrawal*
>>>>
>>>> --
>>> Regards,
>>> Harshvardhan
>>>
>> --
> Regards,
> Harshvardhan
>

Reply via email to