Let's say I have two types sharing the same trait
trait Event {
def id: Id
}
case class EventA(id: Id, info: InfoA) extends Event
case class EventB(id: Id, info: InfoB) extends Event
Each of these events gets pushed to a Kafka topic and gets consumed by a stream
in Flink.
Let's say I have two streams
Events of type A create state:
val typeAStream = env.addSource(...)
.flatMap(someUnmarshallerForA)
.keyBy(_.id)
.mapWithState(...)
val typeBStream = env.addSource(...)
.flatMap(someUnmarshallerForB)
.keyBy(_.id)
I want now to process the events in typeBStream using the information stored in
the State of typeAStream.
One approach would be to use the same stream for the two topics and then
pattern match, but Event subclasses may grow in numbers and
may have different loads, thus I may want to keep things separate.
Would something along the lines of:
typeAStream.connect(typeBStream).
flatMap(
new IdentityFlatMapFunction(),
new SomeRichFlatMapFunctionForEventB[EventB, O] with StateFulFuntion[EventB, O,
G[EventA]] { ... }
)
work?
I tried this approach and I ended up in a NPE because the state object was not
initialized (meaning it was not there).
Thanks,
Aris