Hi Aris,
I think you're on the right track with using a CoFlatMap for this. Could
you maybe post the code of your CoFlatMapFunction (or you could send it to
me privately if you have concerns with publicly posting it) then I could
have a look.

Cheers,
Aljoscha

On Mon, 29 Aug 2016 at 15:48 aris kol <gizera...@hotmail.com> wrote:

> Any other opinion on this?
>
>
> Thanks :)
>
> Aris
> *From:* aris kol <gizera...@hotmail.com>
> *Sent:* Sunday, August 28, 2016 12:04 AM
>
> *To:* user@flink.apache.org
> *Subject:* Re: Accessing state in connected streams
>
> In the implementation I am passing just one CoFlatMapFunction, where
> flatMap1, which operates on EventA, just emits a None (doesn't do anything
> practically) and flatMap2 tries to access the state and throws the NPE.
>
> It wouldn't make sense to use a mapper in this context, I would still want
> to flatten afterwards before pushing dowstream.
>
>
> Aris
>
>
> ------------------------------
> *From:* Sameer W <sam...@axiomine.com>
> *Sent:* Saturday, August 27, 2016 11:40 PM
> *To:* user@flink.apache.org
> *Subject:* Re: Accessing state in connected streams
>
> Ok sorry about that :-). I misunderstood as I am not familiar with Scala
> code. Just curious though how are you passing two MapFunction's to the
> flatMap function on the connected stream. The interface of ConnectedStream
> requires just one CoMap function-
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.html
>
> Sameer
>
> On Sat, Aug 27, 2016 at 6:13 PM, aris kol <gizera...@hotmail.com> wrote:
>
>> Let's say I have two types sharing the same trait
>>
>> trait Event {
>> def id: Id
>> }
>>
>> case class EventA(id: Id, info: InfoA) extends Event
>> case class EventB(id: Id, info: InfoB) extends Event
>>
>> Each of these events gets pushed to a Kafka topic and gets consumed by a
>> stream in Flink.
>>
>> Let's say I have two streams
>>
>> Events of type A create state:
>>
>> val typeAStream = env.addSource(...)
>> .flatMap(someUnmarshallerForA)
>> .keyBy(_.id)
>> .mapWithState(...)
>>
>> val typeBStream = env.addSource(...)
>> .flatMap(someUnmarshallerForB)
>> .keyBy(_.id)
>>
>> I want now to process the events in typeBStream using the information
>> stored in the State of typeAStream.
>>
>> One approach would be to use the same stream for the two topics and then
>> pattern match, but Event subclasses may grow in numbers and
>> may have different loads, thus I may want to keep things separate.
>>
>> Would something along the lines of:
>>
>> typeAStream.connect(typeBStream).
>> flatMap(
>> new IdentityFlatMapFunction(),
>> new SomeRichFlatMapFunctionForEventB[EventB, O] with
>> StateFulFuntion[EventB, O, G[EventA]] { ... }
>> )
>>
>> work?
>>
>> I tried this approach and I ended up in a NPE because the state object
>> was not initialized (meaning it was not there).
>>
>>
>> Thanks,
>> Aris
>>
>>
>

Reply via email to