Hello -

The following piece of code is an example of a connected data streams ..

val rides: DataStream[TaxiRide] =
  readStream(inTaxiRide)
    .filter { ride ⇒ ride.getIsStart().booleanValue }
    .keyBy("rideId")

val fares: DataStream[TaxiFare] =
  readStream(inTaxiFare)
    .keyBy("rideId")

val processed: DataStream[TaxiRideFare] =
  rides
    .connect(fares)
    .flatMap(new EnrichmentFunction)

When I execute the above logic using StreamExecutionEnvironment.execute(..)
it runs fine.
But if I try to execute the above from within a scala.concurrent.Future, I
get the following exception ..

org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG
pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt
type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5,
startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT
pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not
serializable. The object probably contains or references non serializable
fields.
            at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
            at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
            at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
            at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
            at
org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
            at
org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274)
            at
org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179)
  ...

Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
            at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)

Any thoughts why this may happen ?

regards.

-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Reply via email to