Hi Fabian,

> On Apr 24, 2018, at 3:01 AM, Fabian Hueske <fhue...@gmail.com> wrote:
> 
> Hi Alex,
> 
> An operator that has to join two input streams obviously requires two inputs. 
> In case of an enrichment join, the operator should first read the meta-data 
> stream and build up a data structure as state against which the other input 
> is joined. If the meta data is (infrequently) updated, these updates should 
> be integrated into the state.
> 
> The problem is that it is currently not possible to implement such an 
> operator with Flink because operators cannot decide from which input to read, 
> i.e., they have to process whatever data is given to them.
> Hence, it is not possible to build up a data structure from the meta data 
> stream before consuming the other stream.

This seems like a common situation, and one where it might be relatively easy 
for Flink to help resolve.

Specifically, for a connected stream feeding a Co(Flat)MapFunction, it seems 
like we could let Flink know how to pick elements from the two network buffers 
- e.g. random, round robin, or by timestamp.

I don’t know how this works with chained operators, but it does seem a bit odd 
to have operators create buffers of elements when (network) buffers often 
already exist.

If there’s no network buffers in play (e.g. there’s a direct chain of operators 
from a source) then it could be something that’s not supported, though with the 
future source-pull architecture that would also be easy to resolve.

Anyway, I could take a whack at this if it seems reasonable.

— Ken
 


> 
> There are a few workarounds that work in special cases.
> 1) The meta data is rather small and never updated. You put the meta data as 
> a file into a (distributed) file system an read it from each function 
> instance when it is initialized, i.e., in open(), and put into a hash map. 
> Each function instance will hold the complete meta data in memory (on the 
> heap). Since the meta data is broadcasted, the other stream does not need to 
> be partitioned to join against the meta data in the hash map. You can 
> implement this function as a FlatMapFunction or ProcessFunction.
> 2) The meta data is too large and/or is updated. In this case, you need a 
> function with two inputs. Both inputs are keyed (keyBy()) on a join 
> attribute. Since you cannot hold back the non-meta data stream, you need to 
> buffer it in (keyed) state until you've read the meta data stream up to a 
> point when you can start processing the other stream. If the meta data is 
> updated at some point, you can just add the new data to the state. The 
> benefits of this approach is that the state is shared across all operators 
> and can be updated. However, you might need to initially buffer quite a bit 
> of data in state if the non-meta data stream has a high volume.
> 
> Hope that one of these approaches works for your use case.
> 
> Best, Fabian
> 
> 2018-04-23 13:29 GMT+02:00 Alexander Smirnov <alexander.smirn...@gmail.com 
> <mailto:alexander.smirn...@gmail.com>>:
> Hi Fabian,
> 
> please share the workarounds, that must be helpful for my case as well
> 
> Thank you,
> Alex
> 
> On Mon, Apr 23, 2018 at 2:14 PM Fabian Hueske <fhue...@gmail.com 
> <mailto:fhue...@gmail.com>> wrote:
> Hi Miki,
> 
> Sorry for the late response.
> There are basically two ways to implement an enrichment join as in your use 
> case.
> 
> 1) Keep the meta data in the database and implement a job that reads the 
> stream from Kafka and queries the database in an ASyncIO operator for every 
> stream record. This should be the easier implementation but it will send one 
> query to the DB for each streamed record.
> 2) Replicate the meta data into Flink state and join the streamed records 
> with the state. This solution is more complex because you need propagate 
> updates of the meta data (if there are any) into the Flink state. At the 
> moment, Flink lacks a few features to have a good implementation of this 
> approach, but there a some workarounds that help in certain cases.
> 
> Note that Flink's SQL support does not add advantages for the either of both 
> approaches. You should use the DataStream API (and possible ProcessFunctions).
> 
> I'd go for the first approach if one query per record is feasible.
> Let me know if you need to tackle the second approach and I can give some 
> details on the workarounds I mentioned.
> 
> Best, Fabian
> 
> 2018-04-16 20:38 GMT+02:00 Ken Krugler <kkrugler_li...@transpac.com 
> <mailto:kkrugler_li...@transpac.com>>:
> Hi Miki,
> 
> I haven’t tried mixing AsyncFunctions with SQL queries.
> 
> Normally I’d create a regular DataStream workflow that first reads from 
> Kafka, then has an AsyncFunction to read from the SQL database.
> 
> If there are often duplicate keys in the Kafka-based stream, you could 
> keyBy(key) before the AsyncFunction, and then cache the result of the SQL 
> query.
> 
> — Ken
> 
>> On Apr 16, 2018, at 11:19 AM, miki haiat <miko5...@gmail.com 
>> <mailto:miko5...@gmail.com>> wrote:
>> 
>> HI thanks  for the reply  i will try to break your reply to the flow 
>> execution order .
>> 
>> First data stream Will use AsyncIO and select the table ,
>> Second stream will be kafka and the i can join the stream and map it ?
>> 
>> If that   the case  then i will select the table only once on load ?
>> How can i make sure that my stream table is "fresh" .
>> 
>> Im thinking to myself , is thire a way to use flink backend (ROKSDB)  and 
>> create read/write through 
>> macanisem ?
>> 
>> Thanks 
>> 
>> miki
>> 
>> 
>> 
>> On Mon, Apr 16, 2018 at 2:45 AM, Ken Krugler <kkrugler_li...@transpac.com 
>> <mailto:kkrugler_li...@transpac.com>> wrote:
>> If the SQL data is all (or mostly all) needed to join against the data from 
>> Kafka, then I might try a regular join.
>> 
>> Otherwise it sounds like you want to use an AsyncFunction to do ad hoc 
>> queries (in parallel) against your SQL DB.
>> 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/asyncio.html
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/asyncio.html>
>> 
>> — Ken
>> 
>> 
>>> On Apr 15, 2018, at 12:15 PM, miki haiat <miko5...@gmail.com 
>>> <mailto:miko5...@gmail.com>> wrote:
>>> 
>>> Hi,
>>> 
>>> I have a case of meta data enrichment and im wondering if my approach is 
>>> the correct way .
>>> input stream from kafka. 
>>> MD in msSQL .
>>> map to new pojo 
>>> I need to extract  a key from the kafka stream   and use it to select some 
>>> values from the sql table  .
>>> 
>>> SO i thought  to use  the table SQL api in order to select the table MD 
>>> then convert the kafka stream to table and join the data by  the stream key 
>>> .
>>> 
>>> At the end i need to map the joined data to a new POJO and send it to 
>>> elesticserch .
>>> 
>>> Any suggestions or different ways to solve this use case ?
>>> 
>>> thanks,
>>> Miki  
>>> 
>>> 
>>> 
>> 
>> --------------------------
>> Ken Krugler
>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>> custom big data solutions & training
>> Hadoop, Cascading, Cassandra & Solr
>> 
>> 
> 
> --------------------------------------------
> http://about.me/kkrugler <http://about.me/kkrugler>
> +1 530-210-6378 <tel:(530)%20210-6378>
> 
> 

--------------------------------------------
http://about.me/kkrugler
+1 530-210-6378

Reply via email to