ok, the above problem was due to some serialization issues which we fixed by marking some of the things transient. This fixes the serialization issues .. But now when I try to execute in a Future I hit upon this ..
*java.util.concurrent.ExecutionException: Boxed Error* at scala.concurrent.impl.Promise$.resolver(Promise.scala:87) at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) at scala.concurrent.Promise.complete(Promise.scala:53) at scala.concurrent.Promise.complete$(Promise.scala:52) at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187) at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) *Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException* at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) at pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:280) at pipelines.flink.FlinkStreamlet.$anonfun$run$2(FlinkStreamlet.scala:149) at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:658) at scala.util.Success.$anonfun$map$1(Try.scala:255) at scala.util.Success.map(Try.scala:213) at scala.concurrent.Future.$anonfun$map$1(Future.scala:292) at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) ... 7 more I found this issue in JIRA https://issues.apache.org/jira/browse/FLINK-10381 which is still open and talks about a related issue. But we are not submitting multiple jobs - we are just submitting 1 job but async in a Future. I am not clear why this should create the problem that I see. Can anyone please help with an explanation ? regards. On Wed, Sep 18, 2019 at 12:22 AM Debasish Ghosh <ghosh.debas...@gmail.com> wrote: > I think the issue may not be linked with Future. What happens is when this > piece of code is executed .. > > val rides: DataStream[TaxiRide] = > readStream(inTaxiRide) > .filter { ride ⇒ ride.getIsStart().booleanValue } > .keyBy("rideId") > > val fares: DataStream[TaxiFare] = > readStream(inTaxiFare) > .keyBy("rideId") > > val processed: DataStream[TaxiRideFare] = > rides > .connect(fares) > .flatMap(new EnrichmentFunction) > > somehow the ClosureCleaner gets executed as evident from the following > which tries to serialize Avro data. Is there any way to pass the custom > avro serializer that I am using ? > > org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG > pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt > type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5, > startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT > pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not > serializable. The object probably contains or references non serializable > fields. > at > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151) > at > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) > at > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) > at > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) > at > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) > at > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574) > at > org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185) > at > org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274) > at > org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179) > at > pipelines.examples.processor.TaxiRideProcessor$$anon$1.buildExecutionGraph(TaxiRideProcessor.scala:47) > at > pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:278) > at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:149) > at pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44) > at scala.util.Try$.apply(Try.scala:213) > at pipelines.runner.Runner$.run(Runner.scala:43) > at pipelines.runner.Runner$.main(Runner.scala:30) > at pipelines.runner.Runner.main(Runner.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) > at > org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83) > at > org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80) > at > org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126) > at > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142) > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) > at > java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at java.util.ArrayList.writeObject(ArrayList.java:766) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586) > at > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133) > > > I also tried the following .. > > class EnrichmentFunction extends RichCoFlatMapFunction[TaxiRide, TaxiFare, > TaxiRideFare] { > > @transient var rideState: ValueState[TaxiRide] = null > @transient var fareState: ValueState[TaxiFare] = null > > override def open(params: Configuration): Unit = { > super.open(params) > rideState = getRuntimeContext.getState( > new ValueStateDescriptor[TaxiRide]("saved ride", > classOf[TaxiRide])) > fareState = getRuntimeContext.getState( > new ValueStateDescriptor[TaxiFare]("saved fare", > classOf[TaxiFare])) > } > > and moved the state initialization to open function. But still get the > same result. > > Help ? > > regards. > > > > On Tue, Sep 17, 2019 at 12:28 PM Biao Liu <mmyy1...@gmail.com> wrote: > >> Hi Debasish, >> >> I guess the reason is something unexpectedly involved in serialization >> due to a reference from inner class (anonymous class or lambda expression). >> When Flink serializes this inner class instance, it would also serialize >> all referenced objects, for example, the outer class instance. If the outer >> class is not serializable, this error would happen. >> >> You could have a try to move the piece of codes to a named non-inner >> class. >> >> Thanks, >> Biao /'bɪ.aʊ/ >> >> >> >> On Tue, 17 Sep 2019 at 02:06, Debasish Ghosh <ghosh.debas...@gmail.com> >> wrote: >> >>> My main question is why serialisation kicks in when I try to execute >>> within a `Future` and not otherwise. >>> >>> regards. >>> >>> On Mon, 16 Sep 2019 at 4:46 PM, Debasish Ghosh <ghosh.debas...@gmail.com> >>> wrote: >>> >>>> Yes, they are generated from Avro Schema and implements Serializable .. >>>> >>>> On Mon, Sep 16, 2019 at 4:40 PM Deepak Sharma <deepakmc...@gmail.com> >>>> wrote: >>>> >>>>> Does TaxiRide or TaxiRideFare implements Serializable? >>>>> >>>>> On Mon, Sep 16, 2019 at 3:47 PM Debasish Ghosh < >>>>> ghosh.debas...@gmail.com> wrote: >>>>> >>>>>> Hello - >>>>>> >>>>>> The following piece of code is an example of a connected data streams >>>>>> .. >>>>>> >>>>>> val rides: DataStream[TaxiRide] = >>>>>> readStream(inTaxiRide) >>>>>> .filter { ride ⇒ ride.getIsStart().booleanValue } >>>>>> .keyBy("rideId") >>>>>> >>>>>> val fares: DataStream[TaxiFare] = >>>>>> readStream(inTaxiFare) >>>>>> .keyBy("rideId") >>>>>> >>>>>> val processed: DataStream[TaxiRideFare] = >>>>>> rides >>>>>> .connect(fares) >>>>>> .flatMap(new EnrichmentFunction) >>>>>> >>>>>> When I execute the above logic using >>>>>> StreamExecutionEnvironment.execute(..) it runs fine. >>>>>> But if I try to execute the above from within a >>>>>> scala.concurrent.Future, I get the following exception .. >>>>>> >>>>>> org.apache.flink.api.common.InvalidProgramException: [rideId >>>>>> type:LONG pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, >>>>>> passengerCnt type:INT pos:3, driverId type:LONG pos:4, startLon >>>>>> type:FLOAT >>>>>> pos:5, startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat >>>>>> type:FLOAT pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is >>>>>> not serializable. The object probably contains or references non >>>>>> serializable fields. >>>>>> at >>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151) >>>>>> at >>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >>>>>> at >>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >>>>>> at >>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >>>>>> at >>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >>>>>> at >>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) >>>>>> at >>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574) >>>>>> at >>>>>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185) >>>>>> at >>>>>> org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274) >>>>>> at >>>>>> org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179) >>>>>> ... >>>>>> >>>>>> Caused by: java.io.NotSerializableException: >>>>>> org.apache.avro.Schema$Field >>>>>> at >>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) >>>>>> >>>>>> Any thoughts why this may happen ? >>>>>> >>>>>> regards. >>>>>> >>>>>> -- >>>>>> Debasish Ghosh >>>>>> http://manning.com/ghosh2 >>>>>> http://manning.com/ghosh >>>>>> >>>>>> Twttr: @debasishg >>>>>> Blog: http://debasishg.blogspot.com >>>>>> Code: http://github.com/debasishg >>>>>> >>>>> >>>>> >>>>> -- >>>>> Thanks >>>>> Deepak >>>>> www.bigdatabig.com >>>>> www.keosha.net >>>>> >>>> >>>> >>>> -- >>>> Debasish Ghosh >>>> http://manning.com/ghosh2 >>>> http://manning.com/ghosh >>>> >>>> Twttr: @debasishg >>>> Blog: http://debasishg.blogspot.com >>>> Code: http://github.com/debasishg >>>> >>> -- >>> Sent from my iPhone >>> >> > > -- > Debasish Ghosh > http://manning.com/ghosh2 > http://manning.com/ghosh > > Twttr: @debasishg > Blog: http://debasishg.blogspot.com > Code: http://github.com/debasishg > -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg