Does TaxiRide or TaxiRideFare implements Serializable? On Mon, Sep 16, 2019 at 3:47 PM Debasish Ghosh <ghosh.debas...@gmail.com> wrote:
> Hello - > > The following piece of code is an example of a connected data streams .. > > val rides: DataStream[TaxiRide] = > readStream(inTaxiRide) > .filter { ride ⇒ ride.getIsStart().booleanValue } > .keyBy("rideId") > > val fares: DataStream[TaxiFare] = > readStream(inTaxiFare) > .keyBy("rideId") > > val processed: DataStream[TaxiRideFare] = > rides > .connect(fares) > .flatMap(new EnrichmentFunction) > > When I execute the above logic using > StreamExecutionEnvironment.execute(..) it runs fine. > But if I try to execute the above from within a scala.concurrent.Future, > I get the following exception .. > > org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG > pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt > type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5, > startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT > pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not > serializable. The object probably contains or references non serializable > fields. > at > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151) > at > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) > at > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) > at > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) > at > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) > at > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574) > at > org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185) > at > org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274) > at > org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179) > ... > > Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) > > Any thoughts why this may happen ? > > regards. > > -- > Debasish Ghosh > http://manning.com/ghosh2 > http://manning.com/ghosh > > Twttr: @debasishg > Blog: http://debasishg.blogspot.com > Code: http://github.com/debasishg > -- Thanks Deepak www.bigdatabig.com www.keosha.net