HI guys , This is how i tried to solve my enrichment case https://gist.github.com/miko-code/d615aa05b65579f4366ba9fe8a8275fd <https://gist.github.com/miko-code/d615aa05b65579f4366ba9fe8a8275fd> Currently we need to use *keyby()* before the process function. My concern is if i have in flight N messages with the same key the process function will execute once or N times ?
Thanks, MIki On Thu, Apr 26, 2018 at 1:24 PM Fabian Hueske <fhue...@gmail.com> wrote: > Hi all, > > @Ken, the approach of telling the operator which input to read from would > cause problems with the current checkpointing mechanism because checkpoint > barriers are not allowed to overtake regular records. Chaining wouldn't be > an issue, because operators with two inputs are not chained to their > predecessors. > > The side inputs are exactly the effort to address these use cases. Im not > 100% into the details, but AFAIK, there are some improvements to the > checkpointing mechanism that need to be solved before side input can be > implemented. > Side inputs will also support to initially read side inputs (blocking all > other streams) and starting the other other streams once the initialization > is completed. Afterwards the side inputs will still be able to provide > updates. > > Buffering records in a function does not necessarily lead to OOME. If the > stream is keyed, you can put the state into a RocksDBStateBackend and write > it to disk. > > Best, Fabian > > 2018-04-25 23:36 GMT+02:00 Ken Krugler <kkrugler_li...@transpac.com>: > >> Hi Michael, >> >> Windowing works when you’re joining timestamped metadata and non-metadata. >> >> The common case I’m referring to is where there’s some function state >> (e.g. rules to process data, machine learning models, or in my case >> clusters), where you want to process the non-metadata with the "current >> state”. >> >> In that case, blindly applying whatever metadata has been collected to >> incoming non-metadata often doesn’t work well. That’s why Fabian was >> suggesting various approaches (below) to work around the problem. The >> general solution (his option #2, with buffering) will work, but can lead to >> OOME and feels like it breaks the basic Flink back-pressure mechanism, due >> to in-operator buffering. >> >> If it was possible to essentially allow Flink to block (or not pull, for >> sources) from the non-metadata stream when appropriate, then no buffering >> would be needed. Then it would be straightforward to do things like… >> >> - drain all metadata from a Kafka topic before applying that to the other >> stream. >> - defer processing data from the other stream if there was newer metadata. >> >> As an aside, what I’m seeing with Flink 1.5 and using a connected keyed & >> broadcast stream is that the CoFlatMapFunction seems to be giving priority >> to data going to the flatMap1() method, though this could be an odd side >> effect of how iterations impact the two streams. >> >> — Ken >> >> >> On Apr 25, 2018, at 1:09 PM, TechnoMage <mla...@technomage.com> wrote: >> >> I agree in the general case you need to operate on the stream data based >> on the metadata you have. The side input feature coming some day may help >> you, in that it would give you a means to receive inputs out of band. But, >> given changing metadata and changing stream data I am not sure this is any >> different from dual stream data inputs. Either you use windowing to do >> small batches of data to allow coordination of stream and metadata, or you >> use the metadata you have collected to date on receipt of the stream data. >> Given flink will do record by record processing you have the option of >> controlling the timing as needed for your use case. >> >> Michael >> >> On Apr 25, 2018, at 1:57 PM, Ken Krugler <kkrugler_li...@transpac.com> >> wrote: >> >> Hi Michael, >> >> I agree there are cases where it’s possible to implement a solution via >> buffering. >> >> But this case of using broadcast state to update a function operating on >> streaming data seems common enough that it would be useful for Flink to >> provide some help. >> >> Additionally, even with buffering there are currently challenges... >> >> 1. For the case I’m dealing with (iterative KMeans clustering) you don’t >> have a time when "metadata is aggregated", as it’s constantly evolving. >> >> 2. It’s sometimes not possible to know when you’ve received all of the >> metadata (e.g. if you’re reading from a Kafka topic). >> >> 3. Buffering the non-metadata can create an unbounded memory issue. >> >> Regards, >> >> — Ken >> >> >> On Apr 25, 2018, at 12:39 PM, Michael Latta <lat...@me.com> wrote: >> >> Using a flat map function, you can always buffer the non-meta data stream >> in the operator state until the metadata is aggregated, and then process >> any collected data. It would require a RichFlatMap to hold data. >> >> Michael >> >> On Apr 25, 2018, at 1:20 PM, Ken Krugler <kkrugler_li...@transpac.com> >> wrote: >> >> Hi Fabian, >> >> On Apr 24, 2018, at 3:01 AM, Fabian Hueske <fhue...@gmail.com> wrote: >> >> Hi Alex, >> >> An operator that has to join two input streams obviously requires two >> inputs. In case of an enrichment join, the operator should first read the >> meta-data stream and build up a data structure as state against which the >> other input is joined. If the meta data is (infrequently) updated, these >> updates should be integrated into the state. >> >> The problem is that it is currently not possible to implement such an >> operator with Flink because operators cannot decide from which input to >> read, i.e., they have to process whatever data is given to them. >> Hence, it is not possible to build up a data structure from the meta data >> stream before consuming the other stream. >> >> >> This seems like a common situation, and one where it might be relatively >> easy for Flink to help resolve. >> >> Specifically, for a connected stream feeding a Co(Flat)MapFunction, it >> seems like we could let Flink know how to pick elements from the two >> network buffers - e.g. random, round robin, or by timestamp. >> >> I don’t know how this works with chained operators, but it does seem a >> bit odd to have operators create buffers of elements when (network) buffers >> often already exist. >> >> If there’s no network buffers in play (e.g. there’s a direct chain of >> operators from a source) then it could be something that’s not supported, >> though with the future source-pull architecture that would also be easy to >> resolve. >> >> Anyway, I could take a whack at this if it seems reasonable. >> >> — Ken >> >> >> >> >> There are a few workarounds that work in special cases. >> 1) The meta data is rather small and never updated. You put the meta data >> as a file into a (distributed) file system an read it from each function >> instance when it is initialized, i.e., in open(), and put into a hash map. >> Each function instance will hold the complete meta data in memory (on the >> heap). Since the meta data is broadcasted, the other stream does not need >> to be partitioned to join against the meta data in the hash map. You can >> implement this function as a FlatMapFunction or ProcessFunction. >> 2) The meta data is too large and/or is updated. In this case, you need a >> function with two inputs. Both inputs are keyed (keyBy()) on a join >> attribute. Since you cannot hold back the non-meta data stream, you need to >> buffer it in (keyed) state until you've read the meta data stream up to a >> point when you can start processing the other stream. If the meta data is >> updated at some point, you can just add the new data to the state. The >> benefits of this approach is that the state is shared across all operators >> and can be updated. However, you might need to initially buffer quite a bit >> of data in state if the non-meta data stream has a high volume. >> >> Hope that one of these approaches works for your use case. >> >> Best, Fabian >> >> 2018-04-23 13:29 GMT+02:00 Alexander Smirnov < >> alexander.smirn...@gmail.com>: >> >>> Hi Fabian, >>> >>> please share the workarounds, that must be helpful for my case as well >>> >>> Thank you, >>> Alex >>> >>> On Mon, Apr 23, 2018 at 2:14 PM Fabian Hueske <fhue...@gmail.com> wrote: >>> >>>> Hi Miki, >>>> >>>> Sorry for the late response. >>>> There are basically two ways to implement an enrichment join as in your >>>> use case. >>>> >>>> 1) Keep the meta data in the database and implement a job that reads >>>> the stream from Kafka and queries the database in an ASyncIO operator for >>>> every stream record. This should be the easier implementation but it will >>>> send one query to the DB for each streamed record. >>>> 2) Replicate the meta data into Flink state and join the streamed >>>> records with the state. This solution is more complex because you need >>>> propagate updates of the meta data (if there are any) into the Flink state. >>>> At the moment, Flink lacks a few features to have a good implementation of >>>> this approach, but there a some workarounds that help in certain cases. >>>> >>>> Note that Flink's SQL support does not add advantages for the either of >>>> both approaches. You should use the DataStream API (and possible >>>> ProcessFunctions). >>>> >>>> I'd go for the first approach if one query per record is feasible. >>>> Let me know if you need to tackle the second approach and I can give >>>> some details on the workarounds I mentioned. >>>> >>>> Best, Fabian >>>> >>>> 2018-04-16 20:38 GMT+02:00 Ken Krugler <kkrugler_li...@transpac.com>: >>>> >>>>> Hi Miki, >>>>> >>>>> I haven’t tried mixing AsyncFunctions with SQL queries. >>>>> >>>>> Normally I’d create a regular DataStream workflow that first reads >>>>> from Kafka, then has an AsyncFunction to read from the SQL database. >>>>> >>>>> If there are often duplicate keys in the Kafka-based stream, you could >>>>> keyBy(key) before the AsyncFunction, and then cache the result of the SQL >>>>> query. >>>>> >>>>> — Ken >>>>> >>>>> On Apr 16, 2018, at 11:19 AM, miki haiat <miko5...@gmail.com> wrote: >>>>> >>>>> HI thanks for the reply i will try to break your reply to the flow >>>>> execution order . >>>>> >>>>> First data stream Will use AsyncIO and select the table , >>>>> Second stream will be kafka and the i can join the stream and map it ? >>>>> >>>>> If that the case then i will select the table only once on load ? >>>>> How can i make sure that my stream table is "fresh" . >>>>> >>>>> Im thinking to myself , is thire a way to use flink backend (ROKSDB) >>>>> and create read/write through >>>>> macanisem ? >>>>> >>>>> Thanks >>>>> >>>>> miki >>>>> >>>>> >>>>> >>>>> On Mon, Apr 16, 2018 at 2:45 AM, Ken Krugler < >>>>> kkrugler_li...@transpac.com> wrote: >>>>> >>>>>> If the SQL data is all (or mostly all) needed to join against the >>>>>> data from Kafka, then I might try a regular join. >>>>>> >>>>>> Otherwise it sounds like you want to use an AsyncFunction to do ad >>>>>> hoc queries (in parallel) against your SQL DB. >>>>>> >>>>>> >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/asyncio.html >>>>>> >>>>>> — Ken >>>>>> >>>>>> >>>>>> On Apr 15, 2018, at 12:15 PM, miki haiat <miko5...@gmail.com> wrote: >>>>>> >>>>>> Hi, >>>>>> >>>>>> I have a case of meta data enrichment and im wondering if my approach >>>>>> is the correct way . >>>>>> >>>>>> 1. input stream from kafka. >>>>>> 2. MD in msSQL . >>>>>> 3. map to new pojo >>>>>> >>>>>> I need to extract a key from the kafka stream and use it to select >>>>>> some values from the sql table . >>>>>> >>>>>> SO i thought to use the table SQL api in order to select the table >>>>>> MD >>>>>> then convert the kafka stream to table and join the data by the >>>>>> stream key . >>>>>> >>>>>> At the end i need to map the joined data to a new POJO and send it to >>>>>> elesticserch . >>>>>> >>>>>> Any suggestions or different ways to solve this use case ? >>>>>> >>>>>> thanks, >>>>>> Miki >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> -------------------------- >>>>>> Ken Krugler >>>>>> http://www.scaleunlimited.com >>>>>> custom big data solutions & training >>>>>> Hadoop, Cascading, Cassandra & Solr >>>>>> >>>>>> >>>>> >>>>> -------------------------------------------- >>>>> http://about.me/kkrugler >>>>> +1 530-210-6378 <(530)%20210-6378> >>>>> >>>>> >>>> >> >> -------------------------------------------- >> http://about.me/kkrugler >> +1 530-210-6378 >> >> >> >> -------------------------------------------- >> http://about.me/kkrugler >> +1 530-210-6378 >> >> >> >> -------------------------------------------- >> http://about.me/kkrugler >> +1 530-210-6378 >> >> >