Hi Andrey, State is scoped by operator instance so you cannot share state between two different operators even if they use the same name for the state -- "StateX" in your example.
Also, you are correct that the recommended way of doing what you want is to use the connect() operations to connect the two streams and then write a CoMap or CoFlatMap function. In this way you are processing the two streams with a single operator instance and can achieve what you want. If you call keyBy() on each stream and then connect() you will already be in a keyed context in your CoFlatMap operator. There is no reason to call keyBy() again. For example: val keyedStream1 = stream1 .keyBy("foo") val keyedStream2 = stream2 .keyBy("foo) val connectedStream = keyedStream1.connect(keyedStream2) connectedStream.flatMap( YourFunction ) In your YourFunction you'll be able to access per-key state. Does that help? On Fri, Jun 3, 2016 at 9:34 AM, Andrey Utkin <cind...@gmail.com> wrote: > Hi, > > I am newbie in Flink and have questions about stream states. I can’t find > answers in documentation, but if I just miss one, please link to doc) > > 1. Is ValueState (and other state classes) are ‘stream' scoped? So that, > it is not possible to share same state with two (or more) different > pipelines in a same job: > - sourceA -> keyBy -> mapWithStateXUpdate -> print > - sourceB -> keyBy -> mapUsingStateXValue -> print > > Operators ‘mapWithStateXUpdate’ and ‘mapUsingStateXValue’ will use > separate copy of StateX even they use same name for it and same key > (ValueStateDescriptor(“StateX”…)). Is it right? > > 2. In Streaming Guide about connect() operation: > == > Connect allowing for shared state between the two streams. > == > > But how do access state from operators after connect - connect() returns > ConnectedStream but not Keyed, so states defined early is not accessible. > Or it means that doing keyBy after connect() will allow to define new state > based on values from both streams? > > 3. Is state are ‘operator’ scoped? > > - source -> KeyBy -> mapWithStateX_1 -> keyBy -> mapWithStateX_2 > > Assume both map try to use state with same name > (ValueStateDescriptor(“StateX”…)). But despite of that, they will have > different copies. is it right? > > > Thanks! > > -- > Andrey Utkin <cind...@gmail.com> > > > > > > -- Jamie Grier data Artisans, Director of Applications Engineering @jamiegrier <https://twitter.com/jamiegrier> ja...@data-artisans.com