Maxim, great thanks.
We'll try buffering.

С уважением,
Василий Мельник


On Thu, 14 Nov 2019 at 19:36, Maxim Parkachov <lazy.gop...@gmail.com> wrote:

> Hi Vasily,
>
> unfortunately, this is known issue with Flink, you could read discussion
> under
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
>  .
>
> At the moment I have seen 3 solutions for this issue:
>
> 1. You buffer fact stream in local state before broadcast is completely
> read
> 2. You create custom source for fact stream and in open method wait before
> broadcast stream is completely read.
> 3. With latest Flink version, you could pre-populate state with dimension
> and start Flink job with existing state. You need to take care of setting
> correct kafka offsets for dimension stream though, otherwise you will get a
> gap between pre-populated state and moment when job is started.
>
> First 2  solutions need to know when broadcast stream is "completely
> read". I created workaround for this issue with custom source for dimension
> events. It creates "stop file" on shared file system, reads with admin
> interface kafka end offsets for dimension topic, start processing all
> messages from beginning and clears "stop file" after offset of messages
> reached end offsets for all partitions. Instead of "stop file" you could
> use shared lock in zookeeper.
>
> Hope this helps,
> Maxim.
>
> On Thu, Nov 14, 2019 at 7:42 AM vino yang <yanghua1...@gmail.com> wrote:
>
>> Hi Vasily,
>>
>> Currently, Flink did not do the coordination between a general stream and
>> broadcast stream, they are both streams. Your scene of using the broadcast
>> state is a special one. In a more general scene, the states need to be
>> broadcasted is an unbounded stream, the state events may be broadcasted to
>> the downstream at any time. So it can not be wait to be done before playing
>> the usual stream events.
>>
>> For your scene:
>>
>>
>>    - you can change your storage about dimension table, e.g. Redis or
>>    MySQL and so on to do the stream and dimension table join;
>>    - you can inject some control event in your broadcast stream to mark
>>    the stream is end and let the fact stream wait until receiving the control
>>    event. Or you can introduce a thrid-party coordinator e.g. ZooKeeper to
>>    coordinate them, however, it would make your solution more complex.
>>
>> Best,
>> Vino
>>
>>
>> Vasily Melnik <vasily.mel...@glowbyteconsulting.com> 于2019年11月14日周四
>> 下午1:28写道:
>>
>>> Hi all.
>>>
>>> In our task we have two Kafka topics:
>>> - one with fact stream (web traffic)
>>> - one with dimension
>>>
>>> We would like to put dimension data into broadcast state and lookup on
>>> int with facts. But we see that not all dimension records are put into
>>> state before first fact record is processed, so lookup gives no data.
>>>
>>> The question is: how could we read fact topic with some "delay" to give
>>> dimension enough time to initialize state?
>>>
>>>
>>> С уважением,
>>> Василий Мельник
>>>
>>

Reply via email to