Using a flat map function, you can always buffer the non-meta data stream in the operator state until the metadata is aggregated, and then process any collected data. It would require a RichFlatMap to hold data.
Michael > On Apr 25, 2018, at 1:20 PM, Ken Krugler <kkrugler_li...@transpac.com> wrote: > > Hi Fabian, > >> On Apr 24, 2018, at 3:01 AM, Fabian Hueske <fhue...@gmail.com >> <mailto:fhue...@gmail.com>> wrote: >> >> Hi Alex, >> >> An operator that has to join two input streams obviously requires two >> inputs. In case of an enrichment join, the operator should first read the >> meta-data stream and build up a data structure as state against which the >> other input is joined. If the meta data is (infrequently) updated, these >> updates should be integrated into the state. >> >> The problem is that it is currently not possible to implement such an >> operator with Flink because operators cannot decide from which input to >> read, i.e., they have to process whatever data is given to them. >> Hence, it is not possible to build up a data structure from the meta data >> stream before consuming the other stream. > > This seems like a common situation, and one where it might be relatively easy > for Flink to help resolve. > > Specifically, for a connected stream feeding a Co(Flat)MapFunction, it seems > like we could let Flink know how to pick elements from the two network > buffers - e.g. random, round robin, or by timestamp. > > I don’t know how this works with chained operators, but it does seem a bit > odd to have operators create buffers of elements when (network) buffers often > already exist. > > If there’s no network buffers in play (e.g. there’s a direct chain of > operators from a source) then it could be something that’s not supported, > though with the future source-pull architecture that would also be easy to > resolve. > > Anyway, I could take a whack at this if it seems reasonable. > > — Ken > > > >> >> There are a few workarounds that work in special cases. >> 1) The meta data is rather small and never updated. You put the meta data as >> a file into a (distributed) file system an read it from each function >> instance when it is initialized, i.e., in open(), and put into a hash map. >> Each function instance will hold the complete meta data in memory (on the >> heap). Since the meta data is broadcasted, the other stream does not need to >> be partitioned to join against the meta data in the hash map. You can >> implement this function as a FlatMapFunction or ProcessFunction. >> 2) The meta data is too large and/or is updated. In this case, you need a >> function with two inputs. Both inputs are keyed (keyBy()) on a join >> attribute. Since you cannot hold back the non-meta data stream, you need to >> buffer it in (keyed) state until you've read the meta data stream up to a >> point when you can start processing the other stream. If the meta data is >> updated at some point, you can just add the new data to the state. The >> benefits of this approach is that the state is shared across all operators >> and can be updated. However, you might need to initially buffer quite a bit >> of data in state if the non-meta data stream has a high volume. >> >> Hope that one of these approaches works for your use case. >> >> Best, Fabian >> >> 2018-04-23 13:29 GMT+02:00 Alexander Smirnov <alexander.smirn...@gmail.com >> <mailto:alexander.smirn...@gmail.com>>: >> Hi Fabian, >> >> please share the workarounds, that must be helpful for my case as well >> >> Thank you, >> Alex >> >> On Mon, Apr 23, 2018 at 2:14 PM Fabian Hueske <fhue...@gmail.com >> <mailto:fhue...@gmail.com>> wrote: >> Hi Miki, >> >> Sorry for the late response. >> There are basically two ways to implement an enrichment join as in your use >> case. >> >> 1) Keep the meta data in the database and implement a job that reads the >> stream from Kafka and queries the database in an ASyncIO operator for every >> stream record. This should be the easier implementation but it will send one >> query to the DB for each streamed record. >> 2) Replicate the meta data into Flink state and join the streamed records >> with the state. This solution is more complex because you need propagate >> updates of the meta data (if there are any) into the Flink state. At the >> moment, Flink lacks a few features to have a good implementation of this >> approach, but there a some workarounds that help in certain cases. >> >> Note that Flink's SQL support does not add advantages for the either of both >> approaches. You should use the DataStream API (and possible >> ProcessFunctions). >> >> I'd go for the first approach if one query per record is feasible. >> Let me know if you need to tackle the second approach and I can give some >> details on the workarounds I mentioned. >> >> Best, Fabian >> >> 2018-04-16 20:38 GMT+02:00 Ken Krugler <kkrugler_li...@transpac.com >> <mailto:kkrugler_li...@transpac.com>>: >> Hi Miki, >> >> I haven’t tried mixing AsyncFunctions with SQL queries. >> >> Normally I’d create a regular DataStream workflow that first reads from >> Kafka, then has an AsyncFunction to read from the SQL database. >> >> If there are often duplicate keys in the Kafka-based stream, you could >> keyBy(key) before the AsyncFunction, and then cache the result of the SQL >> query. >> >> — Ken >> >>> On Apr 16, 2018, at 11:19 AM, miki haiat <miko5...@gmail.com >>> <mailto:miko5...@gmail.com>> wrote: >>> >>> HI thanks for the reply i will try to break your reply to the flow >>> execution order . >>> >>> First data stream Will use AsyncIO and select the table , >>> Second stream will be kafka and the i can join the stream and map it ? >>> >>> If that the case then i will select the table only once on load ? >>> How can i make sure that my stream table is "fresh" . >>> >>> Im thinking to myself , is thire a way to use flink backend (ROKSDB) and >>> create read/write through >>> macanisem ? >>> >>> Thanks >>> >>> miki >>> >>> >>> >>> On Mon, Apr 16, 2018 at 2:45 AM, Ken Krugler <kkrugler_li...@transpac.com >>> <mailto:kkrugler_li...@transpac.com>> wrote: >>> If the SQL data is all (or mostly all) needed to join against the data from >>> Kafka, then I might try a regular join. >>> >>> Otherwise it sounds like you want to use an AsyncFunction to do ad hoc >>> queries (in parallel) against your SQL DB. >>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/asyncio.html >>> >>> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/asyncio.html> >>> >>> — Ken >>> >>> >>>> On Apr 15, 2018, at 12:15 PM, miki haiat <miko5...@gmail.com >>>> <mailto:miko5...@gmail.com>> wrote: >>>> >>>> Hi, >>>> >>>> I have a case of meta data enrichment and im wondering if my approach is >>>> the correct way . >>>> input stream from kafka. >>>> MD in msSQL . >>>> map to new pojo >>>> I need to extract a key from the kafka stream and use it to select some >>>> values from the sql table . >>>> >>>> SO i thought to use the table SQL api in order to select the table MD >>>> then convert the kafka stream to table and join the data by the stream >>>> key . >>>> >>>> At the end i need to map the joined data to a new POJO and send it to >>>> elesticserch . >>>> >>>> Any suggestions or different ways to solve this use case ? >>>> >>>> thanks, >>>> Miki >>>> >>>> >>>> >>> >>> -------------------------- >>> Ken Krugler >>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/> >>> custom big data solutions & training >>> Hadoop, Cascading, Cassandra & Solr >>> >>> >> >> -------------------------------------------- >> http://about.me/kkrugler <http://about.me/kkrugler> >> +1 530-210-6378 <tel:(530)%20210-6378> >> >> > > -------------------------------------------- > http://about.me/kkrugler <http://about.me/kkrugler> > +1 530-210-6378