Using a flat map function, you can always buffer the non-meta data stream in 
the operator state until the metadata is aggregated, and then process any 
collected data.  It would require a RichFlatMap to hold data.

Michael

> On Apr 25, 2018, at 1:20 PM, Ken Krugler <kkrugler_li...@transpac.com> wrote:
> 
> Hi Fabian,
> 
>> On Apr 24, 2018, at 3:01 AM, Fabian Hueske <fhue...@gmail.com 
>> <mailto:fhue...@gmail.com>> wrote:
>> 
>> Hi Alex,
>> 
>> An operator that has to join two input streams obviously requires two 
>> inputs. In case of an enrichment join, the operator should first read the 
>> meta-data stream and build up a data structure as state against which the 
>> other input is joined. If the meta data is (infrequently) updated, these 
>> updates should be integrated into the state.
>> 
>> The problem is that it is currently not possible to implement such an 
>> operator with Flink because operators cannot decide from which input to 
>> read, i.e., they have to process whatever data is given to them.
>> Hence, it is not possible to build up a data structure from the meta data 
>> stream before consuming the other stream.
> 
> This seems like a common situation, and one where it might be relatively easy 
> for Flink to help resolve.
> 
> Specifically, for a connected stream feeding a Co(Flat)MapFunction, it seems 
> like we could let Flink know how to pick elements from the two network 
> buffers - e.g. random, round robin, or by timestamp.
> 
> I don’t know how this works with chained operators, but it does seem a bit 
> odd to have operators create buffers of elements when (network) buffers often 
> already exist.
> 
> If there’s no network buffers in play (e.g. there’s a direct chain of 
> operators from a source) then it could be something that’s not supported, 
> though with the future source-pull architecture that would also be easy to 
> resolve.
> 
> Anyway, I could take a whack at this if it seems reasonable.
> 
> — Ken
>  
> 
> 
>> 
>> There are a few workarounds that work in special cases.
>> 1) The meta data is rather small and never updated. You put the meta data as 
>> a file into a (distributed) file system an read it from each function 
>> instance when it is initialized, i.e., in open(), and put into a hash map. 
>> Each function instance will hold the complete meta data in memory (on the 
>> heap). Since the meta data is broadcasted, the other stream does not need to 
>> be partitioned to join against the meta data in the hash map. You can 
>> implement this function as a FlatMapFunction or ProcessFunction.
>> 2) The meta data is too large and/or is updated. In this case, you need a 
>> function with two inputs. Both inputs are keyed (keyBy()) on a join 
>> attribute. Since you cannot hold back the non-meta data stream, you need to 
>> buffer it in (keyed) state until you've read the meta data stream up to a 
>> point when you can start processing the other stream. If the meta data is 
>> updated at some point, you can just add the new data to the state. The 
>> benefits of this approach is that the state is shared across all operators 
>> and can be updated. However, you might need to initially buffer quite a bit 
>> of data in state if the non-meta data stream has a high volume.
>> 
>> Hope that one of these approaches works for your use case.
>> 
>> Best, Fabian
>> 
>> 2018-04-23 13:29 GMT+02:00 Alexander Smirnov <alexander.smirn...@gmail.com 
>> <mailto:alexander.smirn...@gmail.com>>:
>> Hi Fabian,
>> 
>> please share the workarounds, that must be helpful for my case as well
>> 
>> Thank you,
>> Alex
>> 
>> On Mon, Apr 23, 2018 at 2:14 PM Fabian Hueske <fhue...@gmail.com 
>> <mailto:fhue...@gmail.com>> wrote:
>> Hi Miki,
>> 
>> Sorry for the late response.
>> There are basically two ways to implement an enrichment join as in your use 
>> case.
>> 
>> 1) Keep the meta data in the database and implement a job that reads the 
>> stream from Kafka and queries the database in an ASyncIO operator for every 
>> stream record. This should be the easier implementation but it will send one 
>> query to the DB for each streamed record.
>> 2) Replicate the meta data into Flink state and join the streamed records 
>> with the state. This solution is more complex because you need propagate 
>> updates of the meta data (if there are any) into the Flink state. At the 
>> moment, Flink lacks a few features to have a good implementation of this 
>> approach, but there a some workarounds that help in certain cases.
>> 
>> Note that Flink's SQL support does not add advantages for the either of both 
>> approaches. You should use the DataStream API (and possible 
>> ProcessFunctions).
>> 
>> I'd go for the first approach if one query per record is feasible.
>> Let me know if you need to tackle the second approach and I can give some 
>> details on the workarounds I mentioned.
>> 
>> Best, Fabian
>> 
>> 2018-04-16 20:38 GMT+02:00 Ken Krugler <kkrugler_li...@transpac.com 
>> <mailto:kkrugler_li...@transpac.com>>:
>> Hi Miki,
>> 
>> I haven’t tried mixing AsyncFunctions with SQL queries.
>> 
>> Normally I’d create a regular DataStream workflow that first reads from 
>> Kafka, then has an AsyncFunction to read from the SQL database.
>> 
>> If there are often duplicate keys in the Kafka-based stream, you could 
>> keyBy(key) before the AsyncFunction, and then cache the result of the SQL 
>> query.
>> 
>> — Ken
>> 
>>> On Apr 16, 2018, at 11:19 AM, miki haiat <miko5...@gmail.com 
>>> <mailto:miko5...@gmail.com>> wrote:
>>> 
>>> HI thanks  for the reply  i will try to break your reply to the flow 
>>> execution order .
>>> 
>>> First data stream Will use AsyncIO and select the table ,
>>> Second stream will be kafka and the i can join the stream and map it ?
>>> 
>>> If that   the case  then i will select the table only once on load ?
>>> How can i make sure that my stream table is "fresh" .
>>> 
>>> Im thinking to myself , is thire a way to use flink backend (ROKSDB)  and 
>>> create read/write through 
>>> macanisem ?
>>> 
>>> Thanks 
>>> 
>>> miki
>>> 
>>> 
>>> 
>>> On Mon, Apr 16, 2018 at 2:45 AM, Ken Krugler <kkrugler_li...@transpac.com 
>>> <mailto:kkrugler_li...@transpac.com>> wrote:
>>> If the SQL data is all (or mostly all) needed to join against the data from 
>>> Kafka, then I might try a regular join.
>>> 
>>> Otherwise it sounds like you want to use an AsyncFunction to do ad hoc 
>>> queries (in parallel) against your SQL DB.
>>> 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/asyncio.html
>>>  
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/asyncio.html>
>>> 
>>> — Ken
>>> 
>>> 
>>>> On Apr 15, 2018, at 12:15 PM, miki haiat <miko5...@gmail.com 
>>>> <mailto:miko5...@gmail.com>> wrote:
>>>> 
>>>> Hi,
>>>> 
>>>> I have a case of meta data enrichment and im wondering if my approach is 
>>>> the correct way .
>>>> input stream from kafka. 
>>>> MD in msSQL .
>>>> map to new pojo 
>>>> I need to extract  a key from the kafka stream   and use it to select some 
>>>> values from the sql table  .
>>>> 
>>>> SO i thought  to use  the table SQL api in order to select the table MD 
>>>> then convert the kafka stream to table and join the data by  the stream 
>>>> key .
>>>> 
>>>> At the end i need to map the joined data to a new POJO and send it to 
>>>> elesticserch .
>>>> 
>>>> Any suggestions or different ways to solve this use case ?
>>>> 
>>>> thanks,
>>>> Miki  
>>>> 
>>>> 
>>>> 
>>> 
>>> --------------------------
>>> Ken Krugler
>>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>>> custom big data solutions & training
>>> Hadoop, Cascading, Cassandra & Solr
>>> 
>>> 
>> 
>> --------------------------------------------
>> http://about.me/kkrugler <http://about.me/kkrugler>
>> +1 530-210-6378 <tel:(530)%20210-6378>
>> 
>> 
> 
> --------------------------------------------
> http://about.me/kkrugler <http://about.me/kkrugler>
> +1 530-210-6378

Reply via email to