ok, the above problem was due to some serialization issues which we fixed
by marking some of the things transient. This fixes the serialization
issues .. But now when I try to execute in a Future I hit upon this ..


*java.util.concurrent.ExecutionException: Boxed Error* at
scala.concurrent.impl.Promise$.resolver(Promise.scala:87)
at
scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at scala.concurrent.Promise.complete(Promise.scala:53)
at scala.concurrent.Promise.complete$(Promise.scala:52)
at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at
java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

*Caused by:
org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException*
at
org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at
pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:280)
at pipelines.flink.FlinkStreamlet.$anonfun$run$2(FlinkStreamlet.scala:149)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:658)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
... 7 more

I found this issue in JIRA
https://issues.apache.org/jira/browse/FLINK-10381 which
is still open and talks about a related issue. But we are not submitting
multiple jobs - we are just submitting 1 job but async in a Future. I am
not clear why this should create the problem that I see.

Can anyone please help with an explanation ?

regards.

On Wed, Sep 18, 2019 at 12:22 AM Debasish Ghosh <ghosh.debas...@gmail.com>
wrote:

> I think the issue may not be linked with Future. What happens is when this
> piece of code is executed ..
>
> val rides: DataStream[TaxiRide] =
>   readStream(inTaxiRide)
>     .filter { ride ⇒ ride.getIsStart().booleanValue }
>     .keyBy("rideId")
>
> val fares: DataStream[TaxiFare] =
>   readStream(inTaxiFare)
>     .keyBy("rideId")
>
> val processed: DataStream[TaxiRideFare] =
>   rides
>     .connect(fares)
>     .flatMap(new EnrichmentFunction)
>
> somehow the ClosureCleaner gets executed as evident from the following
> which tries to serialize Avro data. Is there any way to pass the custom
> avro serializer that I am using ?
>
> org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG
> pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt
> type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5,
> startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT
> pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not
> serializable. The object probably contains or references non serializable
> fields.
>             at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
>             at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>             at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>             at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>             at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>             at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
>             at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
>             at
> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
>             at
> org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274)
>             at
> org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179)
>             at
> pipelines.examples.processor.TaxiRideProcessor$$anon$1.buildExecutionGraph(TaxiRideProcessor.scala:47)
>             at
> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:278)
>             at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:149)
>             at pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44)
>             at scala.util.Try$.apply(Try.scala:213)
>             at pipelines.runner.Runner$.run(Runner.scala:43)
>             at pipelines.runner.Runner$.main(Runner.scala:30)
>             at pipelines.runner.Runner.main(Runner.scala)
>             at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>             at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>             at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>             at java.lang.reflect.Method.invoke(Method.java:498)
>             at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>             at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
>             at
> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
>             at
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
>             at
> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
>             at
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
>             at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>             at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>             at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>             at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
>             at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>             at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>             at java.util.ArrayList.writeObject(ArrayList.java:766)
>             at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>             at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>             at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>             at java.lang.reflect.Method.invoke(Method.java:498)
>             at
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
>             at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>             at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>             at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>             at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>             at
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
>             at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133)
>
>
> I also tried the following ..
>
> class EnrichmentFunction extends RichCoFlatMapFunction[TaxiRide, TaxiFare,
> TaxiRideFare] {
>
>     @transient var rideState: ValueState[TaxiRide] = null
>     @transient var fareState: ValueState[TaxiFare] = null
>
>     override def open(params: Configuration): Unit = {
>       super.open(params)
>       rideState = getRuntimeContext.getState(
>         new ValueStateDescriptor[TaxiRide]("saved ride",
> classOf[TaxiRide]))
>       fareState = getRuntimeContext.getState(
>         new ValueStateDescriptor[TaxiFare]("saved fare",
> classOf[TaxiFare]))
>     }
>
> and moved the state initialization to open function. But still get the
> same result.
>
> Help ?
>
> regards.
>
>
>
> On Tue, Sep 17, 2019 at 12:28 PM Biao Liu <mmyy1...@gmail.com> wrote:
>
>> Hi Debasish,
>>
>> I guess the reason is something unexpectedly involved in serialization
>> due to a reference from inner class (anonymous class or lambda expression).
>> When Flink serializes this inner class instance, it would also serialize
>> all referenced objects, for example, the outer class instance. If the outer
>> class is not serializable, this error would happen.
>>
>> You could have a try to move the piece of codes to a named non-inner
>> class.
>>
>> Thanks,
>> Biao /'bɪ.aʊ/
>>
>>
>>
>> On Tue, 17 Sep 2019 at 02:06, Debasish Ghosh <ghosh.debas...@gmail.com>
>> wrote:
>>
>>> My main question is why serialisation kicks in when I try to execute
>>> within a `Future` and not otherwise.
>>>
>>> regards.
>>>
>>> On Mon, 16 Sep 2019 at 4:46 PM, Debasish Ghosh <ghosh.debas...@gmail.com>
>>> wrote:
>>>
>>>> Yes, they are generated from Avro Schema and implements Serializable ..
>>>>
>>>> On Mon, Sep 16, 2019 at 4:40 PM Deepak Sharma <deepakmc...@gmail.com>
>>>> wrote:
>>>>
>>>>> Does TaxiRide or TaxiRideFare implements Serializable?
>>>>>
>>>>> On Mon, Sep 16, 2019 at 3:47 PM Debasish Ghosh <
>>>>> ghosh.debas...@gmail.com> wrote:
>>>>>
>>>>>> Hello -
>>>>>>
>>>>>> The following piece of code is an example of a connected data streams
>>>>>> ..
>>>>>>
>>>>>> val rides: DataStream[TaxiRide] =
>>>>>>   readStream(inTaxiRide)
>>>>>>     .filter { ride ⇒ ride.getIsStart().booleanValue }
>>>>>>     .keyBy("rideId")
>>>>>>
>>>>>> val fares: DataStream[TaxiFare] =
>>>>>>   readStream(inTaxiFare)
>>>>>>     .keyBy("rideId")
>>>>>>
>>>>>> val processed: DataStream[TaxiRideFare] =
>>>>>>   rides
>>>>>>     .connect(fares)
>>>>>>     .flatMap(new EnrichmentFunction)
>>>>>>
>>>>>> When I execute the above logic using
>>>>>> StreamExecutionEnvironment.execute(..) it runs fine.
>>>>>> But if I try to execute the above from within a
>>>>>> scala.concurrent.Future, I get the following exception ..
>>>>>>
>>>>>> org.apache.flink.api.common.InvalidProgramException: [rideId
>>>>>> type:LONG pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2,
>>>>>> passengerCnt type:INT pos:3, driverId type:LONG pos:4, startLon 
>>>>>> type:FLOAT
>>>>>> pos:5, startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat
>>>>>> type:FLOAT pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is
>>>>>> not serializable. The object probably contains or references non
>>>>>> serializable fields.
>>>>>>             at
>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
>>>>>>             at
>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>>>>>             at
>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>>>>>             at
>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>>>>>             at
>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>>>>>             at
>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
>>>>>>             at
>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
>>>>>>             at
>>>>>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
>>>>>>             at
>>>>>> org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274)
>>>>>>             at
>>>>>> org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179)
>>>>>>   ...
>>>>>>
>>>>>> Caused by: java.io.NotSerializableException:
>>>>>> org.apache.avro.Schema$Field
>>>>>>             at
>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>>>>>
>>>>>> Any thoughts why this may happen ?
>>>>>>
>>>>>> regards.
>>>>>>
>>>>>> --
>>>>>> Debasish Ghosh
>>>>>> http://manning.com/ghosh2
>>>>>> http://manning.com/ghosh
>>>>>>
>>>>>> Twttr: @debasishg
>>>>>> Blog: http://debasishg.blogspot.com
>>>>>> Code: http://github.com/debasishg
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Thanks
>>>>> Deepak
>>>>> www.bigdatabig.com
>>>>> www.keosha.net
>>>>>
>>>>
>>>>
>>>> --
>>>> Debasish Ghosh
>>>> http://manning.com/ghosh2
>>>> http://manning.com/ghosh
>>>>
>>>> Twttr: @debasishg
>>>> Blog: http://debasishg.blogspot.com
>>>> Code: http://github.com/debasishg
>>>>
>>> --
>>> Sent from my iPhone
>>>
>>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>


-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Reply via email to