Hello - The following piece of code is an example of a connected data streams ..
val rides: DataStream[TaxiRide] = readStream(inTaxiRide) .filter { ride ⇒ ride.getIsStart().booleanValue } .keyBy("rideId") val fares: DataStream[TaxiFare] = readStream(inTaxiFare) .keyBy("rideId") val processed: DataStream[TaxiRideFare] = rides .connect(fares) .flatMap(new EnrichmentFunction) When I execute the above logic using StreamExecutionEnvironment.execute(..) it runs fine. But if I try to execute the above from within a scala.concurrent.Future, I get the following exception .. org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5, startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not serializable. The object probably contains or references non serializable fields. at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574) at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185) at org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274) at org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179) ... Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) Any thoughts why this may happen ? regards. -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg