Hi,

How can you determine whether the required decoration data for an event from 
the main stream is there? If it works via event-time you could think about 
buffering main-input events in the operator until the corresponding decoration 
arrives.

On a side note, we're currently working on broadcast inputs and broadcast 
state, which is a first step towards proper side inputs. This work should land 
in Flink 1.5.

Best,
Aljoscha

> On 20. Jan 2018, at 09:25, Maxim Parkachov <lazy.gop...@gmail.com> wrote:
> 
> Hi Ron,
> 
> I’m joining two streams - one is a “decoration” stream that we have in a 
> compacted Kafka topic, produced using a view on a MySQL table AND using Kafka 
> Connect; the other is the “event data” we want to decorate, coming in over 
> time via Kafka. These streams are keyed the same way - via an “id” field, and 
> we join them using CoFlatMap that attaches the data from the decoration 
> stream to the event data and publishes downstream.
> 
> What I’d like to do in my CoFlatMap is wait to process the event data until 
> we’ve received the corresponding decoration data. How do I make that happen?
> 
>  
> I have exactly the same scenario and researched this. Basically, what we need 
> is 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
>  
> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API>,
>  but unfortunately it seems like nobody actively working on it. You could 
> check ideas there though. 
> 
> Side inputs seems to be implemented in Beam API and should be working on 
> Flink executor, but I didn’t try it for many reasons.
> 
> I have ended with following solution:
> 
> - in the script staring Job I create Stop file on shared filesystem (HDFS)
> - created 2 SourceFunction extending Kafka source 
> - in source function for “decoration” stream in run method I consume all 
> records from compacted topic. Here is the tricky part how to identify if 
> everything is consumed already. I resolved it by reading kafka end offset 
> directly with kafka admin API and checking if I arrived at this offset. After 
> waiting a bit to make sure that event is propagated to next operator I delete 
> Stop file on the shared file system
> - in source function for event data, I have implemented “open” method waiting 
> until Stop file is deleted. This keeps it consuming event data.
> - in pipeline I broadcasted “decoration” event and used CoProcessFunction to 
> store it in state and enrich main event stream.
> 
> The application is not in production yet as I need to do more testing, but it 
> seems to work. 
> 
> Additionally I tried to cache decorated data in state of source function to 
> recover from checkpoint easily, but I’m still not sure if it’s better to read 
> it from compacted topic every time or have additional cache in source 
> function or state in CoProcessFunction is enough.
> 
> Hope this helps and would be interested to hear your experience.
> 
> Regards,
> Maxim.

Reply via email to