Yes, they are generated from Avro Schema and implements Serializable .. On Mon, Sep 16, 2019 at 4:40 PM Deepak Sharma <deepakmc...@gmail.com> wrote:
> Does TaxiRide or TaxiRideFare implements Serializable? > > On Mon, Sep 16, 2019 at 3:47 PM Debasish Ghosh <ghosh.debas...@gmail.com> > wrote: > >> Hello - >> >> The following piece of code is an example of a connected data streams .. >> >> val rides: DataStream[TaxiRide] = >> readStream(inTaxiRide) >> .filter { ride ⇒ ride.getIsStart().booleanValue } >> .keyBy("rideId") >> >> val fares: DataStream[TaxiFare] = >> readStream(inTaxiFare) >> .keyBy("rideId") >> >> val processed: DataStream[TaxiRideFare] = >> rides >> .connect(fares) >> .flatMap(new EnrichmentFunction) >> >> When I execute the above logic using >> StreamExecutionEnvironment.execute(..) it runs fine. >> But if I try to execute the above from within a scala.concurrent.Future, >> I get the following exception .. >> >> org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG >> pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt >> type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5, >> startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT >> pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not >> serializable. The object probably contains or references non serializable >> fields. >> at >> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151) >> at >> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >> at >> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >> at >> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >> at >> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >> at >> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) >> at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574) >> at >> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185) >> at >> org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274) >> at >> org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179) >> ... >> >> Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field >> at >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) >> >> Any thoughts why this may happen ? >> >> regards. >> >> -- >> Debasish Ghosh >> http://manning.com/ghosh2 >> http://manning.com/ghosh >> >> Twttr: @debasishg >> Blog: http://debasishg.blogspot.com >> Code: http://github.com/debasishg >> > > > -- > Thanks > Deepak > www.bigdatabig.com > www.keosha.net > -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg