Re: Issue with sharing state in CoFlatMapFunction

2015-11-17 Thread Anwar Rizal
Broadcast is what we do for the same type of your initial problem indeed. In another thread, Stephan mentioned a possibility of using OperatorState in ConnectedStream. I think this approach using OperatorState does the business as well. In my understanding, the approach using broadcast will requi

Re: Issue with sharing state in CoFlatMapFunction

2015-11-17 Thread Stephan Ewen
A global state that all can access read-only is doable via broadcast(). A global state that is available to all for read and update is currently not available. Consistent operations on that would be quite costly, require some form of distributed communication/consensus. Instead, I would encourage

Re: Issue with sharing state in CoFlatMapFunction

2015-11-17 Thread Vladimir Stoyak
I know I can use broadcast, but was wondering if there is a better way DataStream control_stream = env.addSource(new FlinkKafkaConsumer082(control_topic, new AvroDeserializationSchema(Model.class), properties)).broadcast(); On Tuesday, November 17, 2015 2:45 PM, Vladimir Stoyak wrote:

Re: Issue with sharing state in CoFlatMapFunction

2015-11-17 Thread Vladimir Stoyak
Not that I necessarily need that for this particular example, but is there a Global State available?  IE, how can I make a state available across all parallel instances of an operator? On Tuesday, November 17, 2015 1:49 PM, Vladimir Stoyak wrote: Perfect! It does explain my proble

Re: Issue with sharing state in CoFlatMapFunction

2015-11-17 Thread Vladimir Stoyak
Perfect! It does explain my problem. Thanks a lot On Tuesday, November 17, 2015 1:43 PM, Stephan Ewen wrote: Is the CoFlatMapFunction intended to be executed in parallel? If yes, you need some way to deterministically assign which record goes to which parallel instance. In some way

Re: Issue with sharing state in CoFlatMapFunction

2015-11-17 Thread Stephan Ewen
Is the CoFlatMapFunction intended to be executed in parallel? If yes, you need some way to deterministically assign which record goes to which parallel instance. In some way the CoFlatMapFunction does a parallel (partitions) join between the model and the result of the session windows, so you need

Re: Issue with sharing state in CoFlatMapFunction

2015-11-17 Thread Vladimir Stoyak
My model DataStream is not keyed and does not have any windows, only the main stream has windows and apply function I have two Kafka Streams, one for events and one for model DataStream model_stream = env.addSource(new  FlinkKafkaConsumer082(model_topic, new  AvroDeserializationSchema(Model.class)

Re: Issue with sharing state in CoFlatMapFunction

2015-11-17 Thread Stephan Ewen
Hi! Can you give us a bit more context? For example share the structure of the program (what stream get windowed and connected in what way)? I would guess that the following is the problem: When you connect one stream to another, then partition n of the first stream connects with partition n of