Thanks Stephan. "lazy val" does the trick. On Thu, Aug 4, 2016 at 2:33 AM, Stephan Ewen <se...@apache.org> wrote:
> If the class has non-serializable members, you need to initialize them > "lazily" when the objects are already in the distributed execution (after > serializing / distributing them). > > Making a Scala 'val' a 'lazy val' often does the trick (at minimal > performance cost). > > On Thu, Aug 4, 2016 at 3:56 AM, Jack Huang <jackhu...@mz.com> wrote: > >> Hi all, >> >> I want to read a source of JSON String as Scala Case Class. I don't want >> to have to write a serde for every case class I have. The idea is: >> >> val events = env.addSource(new FlinkKafkaConsumer09[Event]("events", new >> JsonSerde(classOf[Event]), kafkaProp)) >> >> >> >> I was implementing my own JsonSerde with Jackson/Gson, but in both case I >> get the error >> >> Task not serializable >> >> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172) >> >> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164) >> >> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:568) >> >> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:498) >> com.fractionalmedia.stream.PricerEvent$.main(PricerEvent.scala:100) >> >> >> It seems that both Jackson and Gson have classes that is not serializable. >> >> I couldn't find any other solution to perform this JSON-to-Case-Class >> parsing, yet it seems a very basic need. What am I missing? >> >> >> Thanks, >> Jack >> >> >> >> >