Hi Aris, I think you're on the right track with using a CoFlatMap for this. Could you maybe post the code of your CoFlatMapFunction (or you could send it to me privately if you have concerns with publicly posting it) then I could have a look.
Cheers, Aljoscha On Mon, 29 Aug 2016 at 15:48 aris kol <gizera...@hotmail.com> wrote: > Any other opinion on this? > > > Thanks :) > > Aris > *From:* aris kol <gizera...@hotmail.com> > *Sent:* Sunday, August 28, 2016 12:04 AM > > *To:* user@flink.apache.org > *Subject:* Re: Accessing state in connected streams > > In the implementation I am passing just one CoFlatMapFunction, where > flatMap1, which operates on EventA, just emits a None (doesn't do anything > practically) and flatMap2 tries to access the state and throws the NPE. > > It wouldn't make sense to use a mapper in this context, I would still want > to flatten afterwards before pushing dowstream. > > > Aris > > > ------------------------------ > *From:* Sameer W <sam...@axiomine.com> > *Sent:* Saturday, August 27, 2016 11:40 PM > *To:* user@flink.apache.org > *Subject:* Re: Accessing state in connected streams > > Ok sorry about that :-). I misunderstood as I am not familiar with Scala > code. Just curious though how are you passing two MapFunction's to the > flatMap function on the connected stream. The interface of ConnectedStream > requires just one CoMap function- > https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.html > > Sameer > > On Sat, Aug 27, 2016 at 6:13 PM, aris kol <gizera...@hotmail.com> wrote: > >> Let's say I have two types sharing the same trait >> >> trait Event { >> def id: Id >> } >> >> case class EventA(id: Id, info: InfoA) extends Event >> case class EventB(id: Id, info: InfoB) extends Event >> >> Each of these events gets pushed to a Kafka topic and gets consumed by a >> stream in Flink. >> >> Let's say I have two streams >> >> Events of type A create state: >> >> val typeAStream = env.addSource(...) >> .flatMap(someUnmarshallerForA) >> .keyBy(_.id) >> .mapWithState(...) >> >> val typeBStream = env.addSource(...) >> .flatMap(someUnmarshallerForB) >> .keyBy(_.id) >> >> I want now to process the events in typeBStream using the information >> stored in the State of typeAStream. >> >> One approach would be to use the same stream for the two topics and then >> pattern match, but Event subclasses may grow in numbers and >> may have different loads, thus I may want to keep things separate. >> >> Would something along the lines of: >> >> typeAStream.connect(typeBStream). >> flatMap( >> new IdentityFlatMapFunction(), >> new SomeRichFlatMapFunctionForEventB[EventB, O] with >> StateFulFuntion[EventB, O, G[EventA]] { ... } >> ) >> >> work? >> >> I tried this approach and I ended up in a NPE because the state object >> was not initialized (meaning it was not there). >> >> >> Thanks, >> Aris >> >> >