Yes, they are generated from Avro Schema and implements Serializable ..

On Mon, Sep 16, 2019 at 4:40 PM Deepak Sharma <deepakmc...@gmail.com> wrote:

> Does TaxiRide or TaxiRideFare implements Serializable?
>
> On Mon, Sep 16, 2019 at 3:47 PM Debasish Ghosh <ghosh.debas...@gmail.com>
> wrote:
>
>> Hello -
>>
>> The following piece of code is an example of a connected data streams ..
>>
>> val rides: DataStream[TaxiRide] =
>>   readStream(inTaxiRide)
>>     .filter { ride ⇒ ride.getIsStart().booleanValue }
>>     .keyBy("rideId")
>>
>> val fares: DataStream[TaxiFare] =
>>   readStream(inTaxiFare)
>>     .keyBy("rideId")
>>
>> val processed: DataStream[TaxiRideFare] =
>>   rides
>>     .connect(fares)
>>     .flatMap(new EnrichmentFunction)
>>
>> When I execute the above logic using
>> StreamExecutionEnvironment.execute(..) it runs fine.
>> But if I try to execute the above from within a scala.concurrent.Future,
>> I get the following exception ..
>>
>> org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG
>> pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt
>> type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5,
>> startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT
>> pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not
>> serializable. The object probably contains or references non serializable
>> fields.
>>             at
>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
>>             at
>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>             at
>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>             at
>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>             at
>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>             at
>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
>>             at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
>>             at
>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
>>             at
>> org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274)
>>             at
>> org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179)
>>   ...
>>
>> Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
>>             at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>
>> Any thoughts why this may happen ?
>>
>> regards.
>>
>> --
>> Debasish Ghosh
>> http://manning.com/ghosh2
>> http://manning.com/ghosh
>>
>> Twttr: @debasishg
>> Blog: http://debasishg.blogspot.com
>> Code: http://github.com/debasishg
>>
>
>
> --
> Thanks
> Deepak
> www.bigdatabig.com
> www.keosha.net
>


-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Reply via email to