Hi Fabian, > On Apr 24, 2018, at 3:01 AM, Fabian Hueske <fhue...@gmail.com> wrote: > > Hi Alex, > > An operator that has to join two input streams obviously requires two inputs. > In case of an enrichment join, the operator should first read the meta-data > stream and build up a data structure as state against which the other input > is joined. If the meta data is (infrequently) updated, these updates should > be integrated into the state. > > The problem is that it is currently not possible to implement such an > operator with Flink because operators cannot decide from which input to read, > i.e., they have to process whatever data is given to them. > Hence, it is not possible to build up a data structure from the meta data > stream before consuming the other stream.
This seems like a common situation, and one where it might be relatively easy for Flink to help resolve. Specifically, for a connected stream feeding a Co(Flat)MapFunction, it seems like we could let Flink know how to pick elements from the two network buffers - e.g. random, round robin, or by timestamp. I don’t know how this works with chained operators, but it does seem a bit odd to have operators create buffers of elements when (network) buffers often already exist. If there’s no network buffers in play (e.g. there’s a direct chain of operators from a source) then it could be something that’s not supported, though with the future source-pull architecture that would also be easy to resolve. Anyway, I could take a whack at this if it seems reasonable. — Ken > > There are a few workarounds that work in special cases. > 1) The meta data is rather small and never updated. You put the meta data as > a file into a (distributed) file system an read it from each function > instance when it is initialized, i.e., in open(), and put into a hash map. > Each function instance will hold the complete meta data in memory (on the > heap). Since the meta data is broadcasted, the other stream does not need to > be partitioned to join against the meta data in the hash map. You can > implement this function as a FlatMapFunction or ProcessFunction. > 2) The meta data is too large and/or is updated. In this case, you need a > function with two inputs. Both inputs are keyed (keyBy()) on a join > attribute. Since you cannot hold back the non-meta data stream, you need to > buffer it in (keyed) state until you've read the meta data stream up to a > point when you can start processing the other stream. If the meta data is > updated at some point, you can just add the new data to the state. The > benefits of this approach is that the state is shared across all operators > and can be updated. However, you might need to initially buffer quite a bit > of data in state if the non-meta data stream has a high volume. > > Hope that one of these approaches works for your use case. > > Best, Fabian > > 2018-04-23 13:29 GMT+02:00 Alexander Smirnov <alexander.smirn...@gmail.com > <mailto:alexander.smirn...@gmail.com>>: > Hi Fabian, > > please share the workarounds, that must be helpful for my case as well > > Thank you, > Alex > > On Mon, Apr 23, 2018 at 2:14 PM Fabian Hueske <fhue...@gmail.com > <mailto:fhue...@gmail.com>> wrote: > Hi Miki, > > Sorry for the late response. > There are basically two ways to implement an enrichment join as in your use > case. > > 1) Keep the meta data in the database and implement a job that reads the > stream from Kafka and queries the database in an ASyncIO operator for every > stream record. This should be the easier implementation but it will send one > query to the DB for each streamed record. > 2) Replicate the meta data into Flink state and join the streamed records > with the state. This solution is more complex because you need propagate > updates of the meta data (if there are any) into the Flink state. At the > moment, Flink lacks a few features to have a good implementation of this > approach, but there a some workarounds that help in certain cases. > > Note that Flink's SQL support does not add advantages for the either of both > approaches. You should use the DataStream API (and possible ProcessFunctions). > > I'd go for the first approach if one query per record is feasible. > Let me know if you need to tackle the second approach and I can give some > details on the workarounds I mentioned. > > Best, Fabian > > 2018-04-16 20:38 GMT+02:00 Ken Krugler <kkrugler_li...@transpac.com > <mailto:kkrugler_li...@transpac.com>>: > Hi Miki, > > I haven’t tried mixing AsyncFunctions with SQL queries. > > Normally I’d create a regular DataStream workflow that first reads from > Kafka, then has an AsyncFunction to read from the SQL database. > > If there are often duplicate keys in the Kafka-based stream, you could > keyBy(key) before the AsyncFunction, and then cache the result of the SQL > query. > > — Ken > >> On Apr 16, 2018, at 11:19 AM, miki haiat <miko5...@gmail.com >> <mailto:miko5...@gmail.com>> wrote: >> >> HI thanks for the reply i will try to break your reply to the flow >> execution order . >> >> First data stream Will use AsyncIO and select the table , >> Second stream will be kafka and the i can join the stream and map it ? >> >> If that the case then i will select the table only once on load ? >> How can i make sure that my stream table is "fresh" . >> >> Im thinking to myself , is thire a way to use flink backend (ROKSDB) and >> create read/write through >> macanisem ? >> >> Thanks >> >> miki >> >> >> >> On Mon, Apr 16, 2018 at 2:45 AM, Ken Krugler <kkrugler_li...@transpac.com >> <mailto:kkrugler_li...@transpac.com>> wrote: >> If the SQL data is all (or mostly all) needed to join against the data from >> Kafka, then I might try a regular join. >> >> Otherwise it sounds like you want to use an AsyncFunction to do ad hoc >> queries (in parallel) against your SQL DB. >> >> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/asyncio.html >> >> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/asyncio.html> >> >> — Ken >> >> >>> On Apr 15, 2018, at 12:15 PM, miki haiat <miko5...@gmail.com >>> <mailto:miko5...@gmail.com>> wrote: >>> >>> Hi, >>> >>> I have a case of meta data enrichment and im wondering if my approach is >>> the correct way . >>> input stream from kafka. >>> MD in msSQL . >>> map to new pojo >>> I need to extract a key from the kafka stream and use it to select some >>> values from the sql table . >>> >>> SO i thought to use the table SQL api in order to select the table MD >>> then convert the kafka stream to table and join the data by the stream key >>> . >>> >>> At the end i need to map the joined data to a new POJO and send it to >>> elesticserch . >>> >>> Any suggestions or different ways to solve this use case ? >>> >>> thanks, >>> Miki >>> >>> >>> >> >> -------------------------- >> Ken Krugler >> http://www.scaleunlimited.com <http://www.scaleunlimited.com/> >> custom big data solutions & training >> Hadoop, Cascading, Cassandra & Solr >> >> > > -------------------------------------------- > http://about.me/kkrugler <http://about.me/kkrugler> > +1 530-210-6378 <tel:(530)%20210-6378> > > -------------------------------------------- http://about.me/kkrugler +1 530-210-6378