Ah I see, I'm afraid StatefulFunction is more of an internal implementation
detail that cannot be used like that.

This is a small example that shows how you could do a stateful Co-FlatMap
function:

object StateExample {

  trait Base { def id: Int }

  case class EventA(id: Int, info: String)
  case class EventB(id: Int, info: String)

  def main(args: Array[String]) {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val sourceA = env.fromElements(EventA(1, "hello"), EventA(1, "ciao"))
    val sourceB = env.fromElements(EventB(1, "a"), EventB(1, "b"))

    sourceA.keyBy(_.id).connect(sourceB.keyBy(_.id)).flatMap(
      new RichCoFlatMapFunction[EventA, EventB, String] {

        val stateDescriptor = new ListStateDescriptor[String]("seen",
StringSerializer.INSTANCE)

        def flatMap1(in: EventA, out: Collector[String]) = {
          val state = getRuntimeContext.getListState(stateDescriptor)
          // add to state for the key of in (the key is used implicitly)
          state.add(in.info)
        }

        def flatMap2(in: EventB, out: Collector[String]) = {
          val state = getRuntimeContext.getListState(stateDescriptor)
          println(s"GOT $in have seen so far: ${state.get()}")
        }
      })

    env.execute()

  }
}

Let me know if you need more details.

Cheers,
Aljoscha

On Tue, 30 Aug 2016 at 16:21 aris kol <gizera...@hotmail.com> wrote:

> Hi Aljoscha,
>
>
> I removed business objects and logic etc.. I am happy to post here [image:
> 😊] I am sure this is a common issue when you start to seriously mess
> with state.
>
>
> Assuming a type for the Output
> And assuming that there is a function (EventA :=> String) in the
> mapWithState operator of typeAStream (implying the State is just a
> Seq[String] per key)
>
> def coFun = new CoFlatMapFunction[EventA, EventB, Option[Output]] {
>
> override def flatMap1(in: EventA, out: Collector[Option[Output]]) =
> out.collect(None)
>
> override def flatMap2(in: EventB, out: Collector[Option[Output]]) = {
>
>  new RichFlatMapFunction[EventB, Option[Output]] with
> StatefulFunction[EventB, Option[Output], Seq[String]] {
>
>    lazy val stateTypeInfo: TypeInformation[Seq[String]] =
> implicitly[TypeInformation[Seq[String]]]
>    lazy val serializer: TypeSerializer[Seq[String]] =
> stateTypeInfo.createSerializer(getRuntimeContext.getExecutionConfig)
>    override lazy val stateSerializer: TypeSerializer[Seq[String]] =
> serializer
>
>    override def flatMap(in: EventB, out: Collector[Option[Output]]): Unit
> = {
>      out.collect(
>        applyWithState(
>          in,
>          (in, state) =>
>            (state match {
>              case None => None
>              case Some(s) => Some(Output(...))
>            }, state)
>        )
>      )
>    }
>
>    flatMap(in, out)
>
>  }
> }
> }
>
>
> applyWithState throws the exception and my intuition says I am doing
> seriously wrong in the instantiation. I tried to make something work using
> the mapWithState implementation as a guide and I ended up here.
>
> Thanks,
> Aris
>
> ------------------------------
> *From:* Aljoscha Krettek <aljos...@apache.org>
> *Sent:* Tuesday, August 30, 2016 10:06 AM
>
> *To:* user@flink.apache.org
> *Subject:* Re: Accessing state in connected streams
> Hi Aris,
> I think you're on the right track with using a CoFlatMap for this. Could
> you maybe post the code of your CoFlatMapFunction (or you could send it to
> me privately if you have concerns with publicly posting it) then I could
> have a look.
>
> Cheers,
> Aljoscha
>
> On Mon, 29 Aug 2016 at 15:48 aris kol <gizera...@hotmail.com> wrote:
>
>> Any other opinion on this?
>>
>>
>> Thanks :)
>>
>> Aris
>> *From:* aris kol <gizera...@hotmail.com>
>> *Sent:* Sunday, August 28, 2016 12:04 AM
>>
>> *To:* user@flink.apache.org
>> *Subject:* Re: Accessing state in connected streams
>>
>> In the implementation I am passing just one CoFlatMapFunction, where
>> flatMap1, which operates on EventA, just emits a None (doesn't do anything
>> practically) and flatMap2 tries to access the state and throws the NPE.
>>
>> It wouldn't make sense to use a mapper in this context, I would still
>> want to flatten afterwards before pushing dowstream.
>>
>>
>> Aris
>>
>>
>> ------------------------------
>> *From:* Sameer W <sam...@axiomine.com>
>> *Sent:* Saturday, August 27, 2016 11:40 PM
>> *To:* user@flink.apache.org
>> *Subject:* Re: Accessing state in connected streams
>>
>> Ok sorry about that :-). I misunderstood as I am not familiar with Scala
>> code. Just curious though how are you passing two MapFunction's to the
>> flatMap function on the connected stream. The interface of ConnectedStream
>> requires just one CoMap function-
>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.html
>>
>> Sameer
>>
>> On Sat, Aug 27, 2016 at 6:13 PM, aris kol <gizera...@hotmail.com> wrote:
>>
>>> Let's say I have two types sharing the same trait
>>>
>>> trait Event {
>>> def id: Id
>>> }
>>>
>>> case class EventA(id: Id, info: InfoA) extends Event
>>> case class EventB(id: Id, info: InfoB) extends Event
>>>
>>> Each of these events gets pushed to a Kafka topic and gets consumed by a
>>> stream in Flink.
>>>
>>> Let's say I have two streams
>>>
>>> Events of type A create state:
>>>
>>> val typeAStream = env.addSource(...)
>>> .flatMap(someUnmarshallerForA)
>>> .keyBy(_.id)
>>> .mapWithState(...)
>>>
>>> val typeBStream = env.addSource(...)
>>> .flatMap(someUnmarshallerForB)
>>> .keyBy(_.id)
>>>
>>> I want now to process the events in typeBStream using the information
>>> stored in the State of typeAStream.
>>>
>>> One approach would be to use the same stream for the two topics and then
>>> pattern match, but Event subclasses may grow in numbers and
>>> may have different loads, thus I may want to keep things separate.
>>>
>>> Would something along the lines of:
>>>
>>> typeAStream.connect(typeBStream).
>>> flatMap(
>>> new IdentityFlatMapFunction(),
>>> new SomeRichFlatMapFunctionForEventB[EventB, O] with
>>> StateFulFuntion[EventB, O, G[EventA]] { ... }
>>> )
>>>
>>> work?
>>>
>>> I tried this approach and I ended up in a NPE because the state object
>>> was not initialized (meaning it was not there).
>>>
>>>
>>> Thanks,
>>> Aris
>>>
>>>
>>

Reply via email to