Hi all, I want to read a source of JSON String as Scala Case Class. I don't want to have to write a serde for every case class I have. The idea is:
val events = env.addSource(new FlinkKafkaConsumer09[Event]("events", new JsonSerde(classOf[Event]), kafkaProp)) I was implementing my own JsonSerde with Jackson/Gson, but in both case I get the error Task not serializable org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172) org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164) org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:568) org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:498) com.fractionalmedia.stream.PricerEvent$.main(PricerEvent.scala:100) It seems that both Jackson and Gson have classes that is not serializable. I couldn't find any other solution to perform this JSON-to-Case-Class parsing, yet it seems a very basic need. What am I missing? Thanks, Jack