Hi Sameer,
Thank you for your quick response. I don't think event ordering is the problem here, the processor doesn't assume any ordering. KeyedStream[EventA] stores a state of type Set[InfoA] on its key, which I would like KeyedStream[EventB] to access. The code operates on an Option[Set[InfoA]] without inviting trouble by invoking .get. applyWithState throws the exception because the private ValueState[S] is never initialised. Since KeyedStream[EventA] successfully updates the state, it can could be that: - There is some wrong config in SomeRichFlatMapFunctionForEventB, which is fine and can be fixed - I am doing something completely wrong that Flink doesn't support. Thanks, Aris ________________________________ From: Sameer W <sam...@axiomine.com> Sent: Saturday, August 27, 2016 10:17 PM To: user@flink.apache.org Subject: Re: Accessing state in connected streams There is no guarantee about the order in which each stream elements arrive in a connected streams. You have to check if the elements have arrived from Stream A before using the information to process elements from Stream B. Otherwise you have to buffer elements from stream B and check if there are unprocessed elements from stream B when elements arrive from stream A. You might need to do that for elements from both streams depending on how you use them. You will get NPE if you assume events have arrived from A and but they might be lagging behind. On Sat, Aug 27, 2016 at 6:13 PM, aris kol <gizera...@hotmail.com<mailto:gizera...@hotmail.com>> wrote: Let's say I have two types sharing the same trait trait Event { def id: Id } case class EventA(id: Id, info: InfoA) extends Event case class EventB(id: Id, info: InfoB) extends Event Each of these events gets pushed to a Kafka topic and gets consumed by a stream in Flink. Let's say I have two streams Events of type A create state: val typeAStream = env.addSource(...) .flatMap(someUnmarshallerForA) .keyBy(_.id) .mapWithState(...) val typeBStream = env.addSource(...) .flatMap(someUnmarshallerForB) .keyBy(_.id) I want now to process the events in typeBStream using the information stored in the State of typeAStream. One approach would be to use the same stream for the two topics and then pattern match, but Event subclasses may grow in numbers and may have different loads, thus I may want to keep things separate. Would something along the lines of: typeAStream.connect(typeBStream). flatMap( new IdentityFlatMapFunction(), new SomeRichFlatMapFunctionForEventB[EventB, O] with StateFulFuntion[EventB, O, G[EventA]] { ... } ) work? I tried this approach and I ended up in a NPE because the state object was not initialized (meaning it was not there). Thanks, Aris