Hi Andrey,

State is scoped by operator instance so you cannot share state between two
different operators even if they use the same name for the state --
"StateX" in your example.

Also, you are correct that the recommended way of doing what you want is to
use the connect() operations to connect the two streams and then write a
CoMap or CoFlatMap function.  In this way you are processing the two
streams with a single operator instance and can achieve what you want.

If you call keyBy() on each stream and then connect() you will already be
in a keyed context in your CoFlatMap operator.  There is no reason to call
keyBy() again.

For example:

val keyedStream1 = stream1
  .keyBy("foo")

val keyedStream2 = stream2
  .keyBy("foo)

val connectedStream =  keyedStream1.connect(keyedStream2)

connectedStream.flatMap( YourFunction )

In your YourFunction you'll be able to access per-key state.

Does that help?

On Fri, Jun 3, 2016 at 9:34 AM, Andrey Utkin <cind...@gmail.com> wrote:

> Hi,
>
> I am newbie in Flink and have questions about stream states. I can’t find
> answers in documentation, but if I just miss one, please link to doc)
>
> 1. Is ValueState (and other state classes) are ‘stream' scoped? So that,
> it is not possible to share same state with two (or more) different
> pipelines in a same job:
> - sourceA -> keyBy -> mapWithStateXUpdate -> print
> - sourceB -> keyBy -> mapUsingStateXValue -> print
>
> Operators ‘mapWithStateXUpdate’ and ‘mapUsingStateXValue’ will use
> separate copy of StateX even they use same name for it and same key
> (ValueStateDescriptor(“StateX”…)).  Is it right?
>
> 2. In Streaming Guide about connect() operation:
> ==
> Connect allowing for shared state between the two streams.
> ==
>
> But how do access state from operators after connect - connect() returns
> ConnectedStream but not Keyed, so states defined early is not accessible.
> Or it means that doing keyBy after connect() will allow to define new state
> based on values from both streams?
>
> 3. Is state are ‘operator’ scoped?
>
> - source -> KeyBy -> mapWithStateX_1 -> keyBy -> mapWithStateX_2
>
> Assume both map try to use state with same name
> (ValueStateDescriptor(“StateX”…)). But despite of that, they will have
> different copies. is it right?
>
>
> Thanks!
>
> --
> Andrey Utkin <cind...@gmail.com>
>
>
>
>
>
>


-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com

Reply via email to