My main question is why serialisation kicks in when I try to execute within
a `Future` and not otherwise.

regards.

On Mon, 16 Sep 2019 at 4:46 PM, Debasish Ghosh <ghosh.debas...@gmail.com>
wrote:

> Yes, they are generated from Avro Schema and implements Serializable ..
>
> On Mon, Sep 16, 2019 at 4:40 PM Deepak Sharma <deepakmc...@gmail.com>
> wrote:
>
>> Does TaxiRide or TaxiRideFare implements Serializable?
>>
>> On Mon, Sep 16, 2019 at 3:47 PM Debasish Ghosh <ghosh.debas...@gmail.com>
>> wrote:
>>
>>> Hello -
>>>
>>> The following piece of code is an example of a connected data streams ..
>>>
>>> val rides: DataStream[TaxiRide] =
>>>   readStream(inTaxiRide)
>>>     .filter { ride ⇒ ride.getIsStart().booleanValue }
>>>     .keyBy("rideId")
>>>
>>> val fares: DataStream[TaxiFare] =
>>>   readStream(inTaxiFare)
>>>     .keyBy("rideId")
>>>
>>> val processed: DataStream[TaxiRideFare] =
>>>   rides
>>>     .connect(fares)
>>>     .flatMap(new EnrichmentFunction)
>>>
>>> When I execute the above logic using
>>> StreamExecutionEnvironment.execute(..) it runs fine.
>>> But if I try to execute the above from within a scala.concurrent.Future,
>>> I get the following exception ..
>>>
>>> org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG
>>> pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt
>>> type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5,
>>> startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT
>>> pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not
>>> serializable. The object probably contains or references non serializable
>>> fields.
>>>             at
>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
>>>             at
>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>>             at
>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>>             at
>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>>             at
>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>>             at
>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
>>>             at
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
>>>             at
>>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
>>>             at
>>> org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274)
>>>             at
>>> org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179)
>>>   ...
>>>
>>> Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
>>>             at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>>
>>> Any thoughts why this may happen ?
>>>
>>> regards.
>>>
>>> --
>>> Debasish Ghosh
>>> http://manning.com/ghosh2
>>> http://manning.com/ghosh
>>>
>>> Twttr: @debasishg
>>> Blog: http://debasishg.blogspot.com
>>> Code: http://github.com/debasishg
>>>
>>
>>
>> --
>> Thanks
>> Deepak
>> www.bigdatabig.com
>> www.keosha.net
>>
>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>
-- 
Sent from my iPhone

Reply via email to