Hi, How can you determine whether the required decoration data for an event from the main stream is there? If it works via event-time you could think about buffering main-input events in the operator until the corresponding decoration arrives.
On a side note, we're currently working on broadcast inputs and broadcast state, which is a first step towards proper side inputs. This work should land in Flink 1.5. Best, Aljoscha > On 20. Jan 2018, at 09:25, Maxim Parkachov <lazy.gop...@gmail.com> wrote: > > Hi Ron, > > I’m joining two streams - one is a “decoration” stream that we have in a > compacted Kafka topic, produced using a view on a MySQL table AND using Kafka > Connect; the other is the “event data” we want to decorate, coming in over > time via Kafka. These streams are keyed the same way - via an “id” field, and > we join them using CoFlatMap that attaches the data from the decoration > stream to the event data and publishes downstream. > > What I’d like to do in my CoFlatMap is wait to process the event data until > we’ve received the corresponding decoration data. How do I make that happen? > > > I have exactly the same scenario and researched this. Basically, what we need > is > https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API > > <https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API>, > but unfortunately it seems like nobody actively working on it. You could > check ideas there though. > > Side inputs seems to be implemented in Beam API and should be working on > Flink executor, but I didn’t try it for many reasons. > > I have ended with following solution: > > - in the script staring Job I create Stop file on shared filesystem (HDFS) > - created 2 SourceFunction extending Kafka source > - in source function for “decoration” stream in run method I consume all > records from compacted topic. Here is the tricky part how to identify if > everything is consumed already. I resolved it by reading kafka end offset > directly with kafka admin API and checking if I arrived at this offset. After > waiting a bit to make sure that event is propagated to next operator I delete > Stop file on the shared file system > - in source function for event data, I have implemented “open” method waiting > until Stop file is deleted. This keeps it consuming event data. > - in pipeline I broadcasted “decoration” event and used CoProcessFunction to > store it in state and enrich main event stream. > > The application is not in production yet as I need to do more testing, but it > seems to work. > > Additionally I tried to cache decorated data in state of source function to > recover from checkpoint easily, but I’m still not sure if it’s better to read > it from compacted topic every time or have additional cache in source > function or state in CoProcessFunction is enough. > > Hope this helps and would be interested to hear your experience. > > Regards, > Maxim.