Maxim, great thanks. We'll try buffering. С уважением, Василий Мельник
On Thu, 14 Nov 2019 at 19:36, Maxim Parkachov <lazy.gop...@gmail.com> wrote: > Hi Vasily, > > unfortunately, this is known issue with Flink, you could read discussion > under > https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API > . > > At the moment I have seen 3 solutions for this issue: > > 1. You buffer fact stream in local state before broadcast is completely > read > 2. You create custom source for fact stream and in open method wait before > broadcast stream is completely read. > 3. With latest Flink version, you could pre-populate state with dimension > and start Flink job with existing state. You need to take care of setting > correct kafka offsets for dimension stream though, otherwise you will get a > gap between pre-populated state and moment when job is started. > > First 2 solutions need to know when broadcast stream is "completely > read". I created workaround for this issue with custom source for dimension > events. It creates "stop file" on shared file system, reads with admin > interface kafka end offsets for dimension topic, start processing all > messages from beginning and clears "stop file" after offset of messages > reached end offsets for all partitions. Instead of "stop file" you could > use shared lock in zookeeper. > > Hope this helps, > Maxim. > > On Thu, Nov 14, 2019 at 7:42 AM vino yang <yanghua1...@gmail.com> wrote: > >> Hi Vasily, >> >> Currently, Flink did not do the coordination between a general stream and >> broadcast stream, they are both streams. Your scene of using the broadcast >> state is a special one. In a more general scene, the states need to be >> broadcasted is an unbounded stream, the state events may be broadcasted to >> the downstream at any time. So it can not be wait to be done before playing >> the usual stream events. >> >> For your scene: >> >> >> - you can change your storage about dimension table, e.g. Redis or >> MySQL and so on to do the stream and dimension table join; >> - you can inject some control event in your broadcast stream to mark >> the stream is end and let the fact stream wait until receiving the control >> event. Or you can introduce a thrid-party coordinator e.g. ZooKeeper to >> coordinate them, however, it would make your solution more complex. >> >> Best, >> Vino >> >> >> Vasily Melnik <vasily.mel...@glowbyteconsulting.com> 于2019年11月14日周四 >> 下午1:28写道: >> >>> Hi all. >>> >>> In our task we have two Kafka topics: >>> - one with fact stream (web traffic) >>> - one with dimension >>> >>> We would like to put dimension data into broadcast state and lookup on >>> int with facts. But we see that not all dimension records are put into >>> state before first fact record is processed, so lookup gives no data. >>> >>> The question is: how could we read fact topic with some "delay" to give >>> dimension enough time to initialize state? >>> >>> >>> С уважением, >>> Василий Мельник >>> >>