Ah I see, I'm afraid StatefulFunction is more of an internal implementation detail that cannot be used like that.
This is a small example that shows how you could do a stateful Co-FlatMap function: object StateExample { trait Base { def id: Int } case class EventA(id: Int, info: String) case class EventB(id: Int, info: String) def main(args: Array[String]) { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val sourceA = env.fromElements(EventA(1, "hello"), EventA(1, "ciao")) val sourceB = env.fromElements(EventB(1, "a"), EventB(1, "b")) sourceA.keyBy(_.id).connect(sourceB.keyBy(_.id)).flatMap( new RichCoFlatMapFunction[EventA, EventB, String] { val stateDescriptor = new ListStateDescriptor[String]("seen", StringSerializer.INSTANCE) def flatMap1(in: EventA, out: Collector[String]) = { val state = getRuntimeContext.getListState(stateDescriptor) // add to state for the key of in (the key is used implicitly) state.add(in.info) } def flatMap2(in: EventB, out: Collector[String]) = { val state = getRuntimeContext.getListState(stateDescriptor) println(s"GOT $in have seen so far: ${state.get()}") } }) env.execute() } } Let me know if you need more details. Cheers, Aljoscha On Tue, 30 Aug 2016 at 16:21 aris kol <gizera...@hotmail.com> wrote: > Hi Aljoscha, > > > I removed business objects and logic etc.. I am happy to post here [image: > 😊] I am sure this is a common issue when you start to seriously mess > with state. > > > Assuming a type for the Output > And assuming that there is a function (EventA :=> String) in the > mapWithState operator of typeAStream (implying the State is just a > Seq[String] per key) > > def coFun = new CoFlatMapFunction[EventA, EventB, Option[Output]] { > > override def flatMap1(in: EventA, out: Collector[Option[Output]]) = > out.collect(None) > > override def flatMap2(in: EventB, out: Collector[Option[Output]]) = { > > new RichFlatMapFunction[EventB, Option[Output]] with > StatefulFunction[EventB, Option[Output], Seq[String]] { > > lazy val stateTypeInfo: TypeInformation[Seq[String]] = > implicitly[TypeInformation[Seq[String]]] > lazy val serializer: TypeSerializer[Seq[String]] = > stateTypeInfo.createSerializer(getRuntimeContext.getExecutionConfig) > override lazy val stateSerializer: TypeSerializer[Seq[String]] = > serializer > > override def flatMap(in: EventB, out: Collector[Option[Output]]): Unit > = { > out.collect( > applyWithState( > in, > (in, state) => > (state match { > case None => None > case Some(s) => Some(Output(...)) > }, state) > ) > ) > } > > flatMap(in, out) > > } > } > } > > > applyWithState throws the exception and my intuition says I am doing > seriously wrong in the instantiation. I tried to make something work using > the mapWithState implementation as a guide and I ended up here. > > Thanks, > Aris > > ------------------------------ > *From:* Aljoscha Krettek <aljos...@apache.org> > *Sent:* Tuesday, August 30, 2016 10:06 AM > > *To:* user@flink.apache.org > *Subject:* Re: Accessing state in connected streams > Hi Aris, > I think you're on the right track with using a CoFlatMap for this. Could > you maybe post the code of your CoFlatMapFunction (or you could send it to > me privately if you have concerns with publicly posting it) then I could > have a look. > > Cheers, > Aljoscha > > On Mon, 29 Aug 2016 at 15:48 aris kol <gizera...@hotmail.com> wrote: > >> Any other opinion on this? >> >> >> Thanks :) >> >> Aris >> *From:* aris kol <gizera...@hotmail.com> >> *Sent:* Sunday, August 28, 2016 12:04 AM >> >> *To:* user@flink.apache.org >> *Subject:* Re: Accessing state in connected streams >> >> In the implementation I am passing just one CoFlatMapFunction, where >> flatMap1, which operates on EventA, just emits a None (doesn't do anything >> practically) and flatMap2 tries to access the state and throws the NPE. >> >> It wouldn't make sense to use a mapper in this context, I would still >> want to flatten afterwards before pushing dowstream. >> >> >> Aris >> >> >> ------------------------------ >> *From:* Sameer W <sam...@axiomine.com> >> *Sent:* Saturday, August 27, 2016 11:40 PM >> *To:* user@flink.apache.org >> *Subject:* Re: Accessing state in connected streams >> >> Ok sorry about that :-). I misunderstood as I am not familiar with Scala >> code. Just curious though how are you passing two MapFunction's to the >> flatMap function on the connected stream. The interface of ConnectedStream >> requires just one CoMap function- >> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.html >> >> Sameer >> >> On Sat, Aug 27, 2016 at 6:13 PM, aris kol <gizera...@hotmail.com> wrote: >> >>> Let's say I have two types sharing the same trait >>> >>> trait Event { >>> def id: Id >>> } >>> >>> case class EventA(id: Id, info: InfoA) extends Event >>> case class EventB(id: Id, info: InfoB) extends Event >>> >>> Each of these events gets pushed to a Kafka topic and gets consumed by a >>> stream in Flink. >>> >>> Let's say I have two streams >>> >>> Events of type A create state: >>> >>> val typeAStream = env.addSource(...) >>> .flatMap(someUnmarshallerForA) >>> .keyBy(_.id) >>> .mapWithState(...) >>> >>> val typeBStream = env.addSource(...) >>> .flatMap(someUnmarshallerForB) >>> .keyBy(_.id) >>> >>> I want now to process the events in typeBStream using the information >>> stored in the State of typeAStream. >>> >>> One approach would be to use the same stream for the two topics and then >>> pattern match, but Event subclasses may grow in numbers and >>> may have different loads, thus I may want to keep things separate. >>> >>> Would something along the lines of: >>> >>> typeAStream.connect(typeBStream). >>> flatMap( >>> new IdentityFlatMapFunction(), >>> new SomeRichFlatMapFunctionForEventB[EventB, O] with >>> StateFulFuntion[EventB, O, G[EventA]] { ... } >>> ) >>> >>> work? >>> >>> I tried this approach and I ended up in a NPE because the state object >>> was not initialized (meaning it was not there). >>> >>> >>> Thanks, >>> Aris >>> >>> >>