My main question is why serialisation kicks in when I try to execute within a `Future` and not otherwise.
regards. On Mon, 16 Sep 2019 at 4:46 PM, Debasish Ghosh <ghosh.debas...@gmail.com> wrote: > Yes, they are generated from Avro Schema and implements Serializable .. > > On Mon, Sep 16, 2019 at 4:40 PM Deepak Sharma <deepakmc...@gmail.com> > wrote: > >> Does TaxiRide or TaxiRideFare implements Serializable? >> >> On Mon, Sep 16, 2019 at 3:47 PM Debasish Ghosh <ghosh.debas...@gmail.com> >> wrote: >> >>> Hello - >>> >>> The following piece of code is an example of a connected data streams .. >>> >>> val rides: DataStream[TaxiRide] = >>> readStream(inTaxiRide) >>> .filter { ride ⇒ ride.getIsStart().booleanValue } >>> .keyBy("rideId") >>> >>> val fares: DataStream[TaxiFare] = >>> readStream(inTaxiFare) >>> .keyBy("rideId") >>> >>> val processed: DataStream[TaxiRideFare] = >>> rides >>> .connect(fares) >>> .flatMap(new EnrichmentFunction) >>> >>> When I execute the above logic using >>> StreamExecutionEnvironment.execute(..) it runs fine. >>> But if I try to execute the above from within a scala.concurrent.Future, >>> I get the following exception .. >>> >>> org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG >>> pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt >>> type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5, >>> startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT >>> pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not >>> serializable. The object probably contains or references non serializable >>> fields. >>> at >>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151) >>> at >>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >>> at >>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >>> at >>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >>> at >>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >>> at >>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) >>> at >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574) >>> at >>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185) >>> at >>> org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274) >>> at >>> org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179) >>> ... >>> >>> Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field >>> at >>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) >>> >>> Any thoughts why this may happen ? >>> >>> regards. >>> >>> -- >>> Debasish Ghosh >>> http://manning.com/ghosh2 >>> http://manning.com/ghosh >>> >>> Twttr: @debasishg >>> Blog: http://debasishg.blogspot.com >>> Code: http://github.com/debasishg >>> >> >> >> -- >> Thanks >> Deepak >> www.bigdatabig.com >> www.keosha.net >> > > > -- > Debasish Ghosh > http://manning.com/ghosh2 > http://manning.com/ghosh > > Twttr: @debasishg > Blog: http://debasishg.blogspot.com > Code: http://github.com/debasishg > -- Sent from my iPhone