Hello,

I'm working with Flink 1.4.2 (Scala API) and I'm having some trouble with my 
custom RichMapFunction as I want the element in my Datastream to also be used 
for a parameter of this custom class. My RichMapFunction is a simple counter 
based on a MapState

Let's say I have those classes

-          case class Feature(transaction: Transaction) { override def getKey: 
(String, String) = ... }

-          class CustomMapFunction(featureKey: (String, String)) extends 
RichMapFunction[Transaction, Transaction]

I implemented my custom map function with the needed functions but I 
encountered different issues as I tried several solutions for this. In the 
following chunks of code, stream is a DataStream[Transaction] and I expect a 
DataStream[Transaction] as output type too


*         stream.keyBy(transaction => 
Feature(transaction).getKey).map(transaction => new 
CustomMapFunction(Feature(transaction).getKey))

o   this leads to a compilation error ("Expression of type CustomMapFunction 
doesn't conform to expected type R_"), which, as far as I understand, should 
come from the fact I'm already using transaction for the Feature(transaction) 
part

*         stream.keyBy(transaction => 
Feature(transaction).getKey).map(transaction => new 
CustomMapFunction(Feature(transaction).getKey).map(transaction))

o   compiles but fails with a NullPointerException at runtime as the MapState 
is not initialized. When running with debugger the open function was not used 
which leads the MapState to stay null (I don't have this problem with a more 
simple version of my CustomMapFunction which does not need this parameter based 
on the transaction)

Do you have an idea of how I could solve this issue ?

Thanks in advance for any help and I hope I was clear enough (that's my first 
question on the mailing list, don't hesitate to say if I forgot some steps or 
elements :))

Best regards,

Isabelle

Reply via email to