Debshish, could you share an example of before and after of your classes for future reference?
On Thu, 19 Sep 2019, 10:42 Debasish Ghosh, <ghosh.debas...@gmail.com> wrote: > We solved the problem of serialization by making some things transient > which were being captured as part of the closure. So we no longer have > serialization errors. Everything works properly without the future. > > I realize that because of statics concurrent job submission will be an > issue. But we are submitting one job only - the difference is that it's > through a Future. So there is no concurrent submission unless I am missing > something. > > regards. > > On Thu, Sep 19, 2019 at 12:54 PM Biao Liu <mmyy1...@gmail.com> wrote: > >> Hi Debasish, >> >> I think there is something critical of your usage hided. It might help if >> you could provide more details. >> >> It still confuses me how you solve the serialization issue. Why the >> non-transient fields only affects serialization in a future? >> >> WRT this ProgramAbortException issue, do you submit jobs concurrently in >> one process? >> Currently job submission is not thread-safe. It relies on some static >> variables which could be affected by other concurrent submissions in the >> same process. >> Asking this because usually job submission is not through >> OptimizerPlanEnvironment which appears in your exception stack trace. >> >> Thanks, >> Biao /'bɪ.aʊ/ >> >> >> >> On Thu, 19 Sep 2019 at 15:03, Debasish Ghosh <ghosh.debas...@gmail.com> >> wrote: >> >>> I think what you are pointing at is asynchronous datastream operations. >>> In our case we want to submit the entire job in a Future. Something like >>> the following .. >>> >>> def execute(..) = { >>> // this does all data stream manipulation, joins etc. >>> buildComputationGraph() >>> >>> // submits for execution with StreamExecutionEnvironment >>> env.execute(..) >>> } >>> >>> and we want to do .. >>> >>> val jobExecutionResultFuture = Future(execute(..)) >>> >>> and this gives that exception. >>> >>> regards. >>> >>> On Thu, Sep 19, 2019 at 11:00 AM Rafi Aroch <rafi.ar...@gmail.com> >>> wrote: >>> >>>> Hi Debasish, >>>> >>>> Have you taken a look at the AsyncIO API for running async operations? >>>> I think this is the preferred way of doing it. [1] >>>> So it would look something like this: >>>> >>>> class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] >>>> { >>>> >>>> /** The database specific client that can issue concurrent requests >>>> with callbacks */ >>>> lazy val client: DatabaseClient = new DatabaseClient(host, post, >>>> credentials) >>>> >>>> /** The context used for the future callbacks */ >>>> implicit lazy val executor: ExecutionContext = >>>> ExecutionContext.fromExecutor(Executors.directExecutor()) >>>> >>>> >>>> override def asyncInvoke(str: String, resultFuture: >>>> ResultFuture[(String, String)]): Unit = { >>>> >>>> // issue the asynchronous request, receive a future for the result >>>> val resultFutureRequested: Future[String] = client.query(str) >>>> >>>> // set the callback to be executed once the request by the client >>>> is complete // the callback simply forwards the result to the >>>> result future resultFutureRequested.onSuccess { >>>> case result: String => resultFuture.complete(Iterable((str, >>>> result))) >>>> } >>>> }} >>>> >>>> // create the original streamval stream: DataStream[String] = ... >>>> // apply the async I/O transformationval resultStream: DataStream[(String, >>>> String)] = >>>> AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), >>>> 1000, TimeUnit.MILLISECONDS, 100) >>>> >>>> >>>> Thanks, >>>> Rafi >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html#async-io-api >>>> >>>> On Wed, Sep 18, 2019 at 8:26 PM Debasish Ghosh < >>>> ghosh.debas...@gmail.com> wrote: >>>> >>>>> ok, the above problem was due to some serialization issues which we >>>>> fixed by marking some of the things transient. This fixes the >>>>> serialization >>>>> issues .. But now when I try to execute in a Future I hit upon this .. >>>>> >>>>> >>>>> *java.util.concurrent.ExecutionException: Boxed Error* at >>>>> scala.concurrent.impl.Promise$.resolver(Promise.scala:87) >>>>> at >>>>> scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79) >>>>> at >>>>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) >>>>> at scala.concurrent.Promise.complete(Promise.scala:53) >>>>> at scala.concurrent.Promise.complete$(Promise.scala:52) >>>>> at >>>>> scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187) >>>>> at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) >>>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) >>>>> at >>>>> java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402) >>>>> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) >>>>> at >>>>> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) >>>>> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) >>>>> at >>>>> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) >>>>> >>>>> *Caused by: >>>>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException* >>>>> at >>>>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66) >>>>> at >>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) >>>>> at >>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) >>>>> at >>>>> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:280) >>>>> at >>>>> pipelines.flink.FlinkStreamlet.$anonfun$run$2(FlinkStreamlet.scala:149) >>>>> at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:658) >>>>> at scala.util.Success.$anonfun$map$1(Try.scala:255) >>>>> at scala.util.Success.map(Try.scala:213) >>>>> at scala.concurrent.Future.$anonfun$map$1(Future.scala:292) >>>>> at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) >>>>> ... 7 more >>>>> >>>>> I found this issue in JIRA >>>>> https://issues.apache.org/jira/browse/FLINK-10381 which is still open >>>>> and talks about a related issue. But we are not submitting multiple jobs - >>>>> we are just submitting 1 job but async in a Future. I am not clear why >>>>> this >>>>> should create the problem that I see. >>>>> >>>>> Can anyone please help with an explanation ? >>>>> >>>>> regards. >>>>> >>>>> On Wed, Sep 18, 2019 at 12:22 AM Debasish Ghosh < >>>>> ghosh.debas...@gmail.com> wrote: >>>>> >>>>>> I think the issue may not be linked with Future. What happens is when >>>>>> this piece of code is executed .. >>>>>> >>>>>> val rides: DataStream[TaxiRide] = >>>>>> readStream(inTaxiRide) >>>>>> .filter { ride ⇒ ride.getIsStart().booleanValue } >>>>>> .keyBy("rideId") >>>>>> >>>>>> val fares: DataStream[TaxiFare] = >>>>>> readStream(inTaxiFare) >>>>>> .keyBy("rideId") >>>>>> >>>>>> val processed: DataStream[TaxiRideFare] = >>>>>> rides >>>>>> .connect(fares) >>>>>> .flatMap(new EnrichmentFunction) >>>>>> >>>>>> somehow the ClosureCleaner gets executed as evident from the >>>>>> following which tries to serialize Avro data. Is there any way to pass >>>>>> the >>>>>> custom avro serializer that I am using ? >>>>>> >>>>>> org.apache.flink.api.common.InvalidProgramException: [rideId >>>>>> type:LONG pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, >>>>>> passengerCnt type:INT pos:3, driverId type:LONG pos:4, startLon >>>>>> type:FLOAT >>>>>> pos:5, startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat >>>>>> type:FLOAT pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is >>>>>> not serializable. The object probably contains or references non >>>>>> serializable fields. >>>>>> at >>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151) >>>>>> at >>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >>>>>> at >>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >>>>>> at >>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >>>>>> at >>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >>>>>> at >>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) >>>>>> at >>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574) >>>>>> at >>>>>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185) >>>>>> at >>>>>> org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274) >>>>>> at >>>>>> org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179) >>>>>> at >>>>>> pipelines.examples.processor.TaxiRideProcessor$$anon$1.buildExecutionGraph(TaxiRideProcessor.scala:47) >>>>>> at >>>>>> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:278) >>>>>> at >>>>>> pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:149) >>>>>> at >>>>>> pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44) >>>>>> at scala.util.Try$.apply(Try.scala:213) >>>>>> at pipelines.runner.Runner$.run(Runner.scala:43) >>>>>> at pipelines.runner.Runner$.main(Runner.scala:30) >>>>>> at pipelines.runner.Runner.main(Runner.scala) >>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native >>>>>> Method) >>>>>> at >>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>> at >>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>>> at >>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) >>>>>> at >>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) >>>>>> at >>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83) >>>>>> at >>>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80) >>>>>> at >>>>>> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126) >>>>>> at >>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142) >>>>>> at >>>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) >>>>>> at >>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>>>> at >>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>> Caused by: java.io.NotSerializableException: >>>>>> org.apache.avro.Schema$Field >>>>>> at >>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) >>>>>> at >>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >>>>>> at java.util.ArrayList.writeObject(ArrayList.java:766) >>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native >>>>>> Method) >>>>>> at >>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>> at >>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>>> at >>>>>> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140) >>>>>> at >>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) >>>>>> at >>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >>>>>> at >>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >>>>>> at >>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >>>>>> at >>>>>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586) >>>>>> at >>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133) >>>>>> >>>>>> >>>>>> I also tried the following .. >>>>>> >>>>>> class EnrichmentFunction extends RichCoFlatMapFunction[TaxiRide, >>>>>> TaxiFare, TaxiRideFare] { >>>>>> >>>>>> @transient var rideState: ValueState[TaxiRide] = null >>>>>> @transient var fareState: ValueState[TaxiFare] = null >>>>>> >>>>>> override def open(params: Configuration): Unit = { >>>>>> super.open(params) >>>>>> rideState = getRuntimeContext.getState( >>>>>> new ValueStateDescriptor[TaxiRide]("saved ride", >>>>>> classOf[TaxiRide])) >>>>>> fareState = getRuntimeContext.getState( >>>>>> new ValueStateDescriptor[TaxiFare]("saved fare", >>>>>> classOf[TaxiFare])) >>>>>> } >>>>>> >>>>>> and moved the state initialization to open function. But still get >>>>>> the same result. >>>>>> >>>>>> Help ? >>>>>> >>>>>> regards. >>>>>> >>>>>> >>>>>> >>>>>> On Tue, Sep 17, 2019 at 12:28 PM Biao Liu <mmyy1...@gmail.com> wrote: >>>>>> >>>>>>> Hi Debasish, >>>>>>> >>>>>>> I guess the reason is something unexpectedly involved in >>>>>>> serialization due to a reference from inner class (anonymous class or >>>>>>> lambda expression). >>>>>>> When Flink serializes this inner class instance, it would also >>>>>>> serialize all referenced objects, for example, the outer class >>>>>>> instance. If >>>>>>> the outer class is not serializable, this error would happen. >>>>>>> >>>>>>> You could have a try to move the piece of codes to a named non-inner >>>>>>> class. >>>>>>> >>>>>>> Thanks, >>>>>>> Biao /'bɪ.aʊ/ >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, 17 Sep 2019 at 02:06, Debasish Ghosh < >>>>>>> ghosh.debas...@gmail.com> wrote: >>>>>>> >>>>>>>> My main question is why serialisation kicks in when I try to >>>>>>>> execute within a `Future` and not otherwise. >>>>>>>> >>>>>>>> regards. >>>>>>>> >>>>>>>> On Mon, 16 Sep 2019 at 4:46 PM, Debasish Ghosh < >>>>>>>> ghosh.debas...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Yes, they are generated from Avro Schema and implements >>>>>>>>> Serializable .. >>>>>>>>> >>>>>>>>> On Mon, Sep 16, 2019 at 4:40 PM Deepak Sharma < >>>>>>>>> deepakmc...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Does TaxiRide or TaxiRideFare implements Serializable? >>>>>>>>>> >>>>>>>>>> On Mon, Sep 16, 2019 at 3:47 PM Debasish Ghosh < >>>>>>>>>> ghosh.debas...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hello - >>>>>>>>>>> >>>>>>>>>>> The following piece of code is an example of a connected data >>>>>>>>>>> streams .. >>>>>>>>>>> >>>>>>>>>>> val rides: DataStream[TaxiRide] = >>>>>>>>>>> readStream(inTaxiRide) >>>>>>>>>>> .filter { ride ⇒ ride.getIsStart().booleanValue } >>>>>>>>>>> .keyBy("rideId") >>>>>>>>>>> >>>>>>>>>>> val fares: DataStream[TaxiFare] = >>>>>>>>>>> readStream(inTaxiFare) >>>>>>>>>>> .keyBy("rideId") >>>>>>>>>>> >>>>>>>>>>> val processed: DataStream[TaxiRideFare] = >>>>>>>>>>> rides >>>>>>>>>>> .connect(fares) >>>>>>>>>>> .flatMap(new EnrichmentFunction) >>>>>>>>>>> >>>>>>>>>>> When I execute the above logic using >>>>>>>>>>> StreamExecutionEnvironment.execute(..) it runs fine. >>>>>>>>>>> But if I try to execute the above from within a >>>>>>>>>>> scala.concurrent.Future, I get the following exception .. >>>>>>>>>>> >>>>>>>>>>> org.apache.flink.api.common.InvalidProgramException: [rideId >>>>>>>>>>> type:LONG pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, >>>>>>>>>>> passengerCnt type:INT pos:3, driverId type:LONG pos:4, startLon >>>>>>>>>>> type:FLOAT >>>>>>>>>>> pos:5, startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat >>>>>>>>>>> type:FLOAT pos:8, startTime type:LONG pos:9, endTime type:LONG >>>>>>>>>>> pos:10] is >>>>>>>>>>> not serializable. The object probably contains or references non >>>>>>>>>>> serializable fields. >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179) >>>>>>>>>>> ... >>>>>>>>>>> >>>>>>>>>>> Caused by: java.io.NotSerializableException: >>>>>>>>>>> org.apache.avro.Schema$Field >>>>>>>>>>> at >>>>>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) >>>>>>>>>>> >>>>>>>>>>> Any thoughts why this may happen ? >>>>>>>>>>> >>>>>>>>>>> regards. >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Debasish Ghosh >>>>>>>>>>> http://manning.com/ghosh2 >>>>>>>>>>> http://manning.com/ghosh >>>>>>>>>>> >>>>>>>>>>> Twttr: @debasishg >>>>>>>>>>> Blog: http://debasishg.blogspot.com >>>>>>>>>>> Code: http://github.com/debasishg >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Thanks >>>>>>>>>> Deepak >>>>>>>>>> www.bigdatabig.com >>>>>>>>>> www.keosha.net >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Debasish Ghosh >>>>>>>>> http://manning.com/ghosh2 >>>>>>>>> http://manning.com/ghosh >>>>>>>>> >>>>>>>>> Twttr: @debasishg >>>>>>>>> Blog: http://debasishg.blogspot.com >>>>>>>>> Code: http://github.com/debasishg >>>>>>>>> >>>>>>>> -- >>>>>>>> Sent from my iPhone >>>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> Debasish Ghosh >>>>>> http://manning.com/ghosh2 >>>>>> http://manning.com/ghosh >>>>>> >>>>>> Twttr: @debasishg >>>>>> Blog: http://debasishg.blogspot.com >>>>>> Code: http://github.com/debasishg >>>>>> >>>>> >>>>> >>>>> -- >>>>> Debasish Ghosh >>>>> http://manning.com/ghosh2 >>>>> http://manning.com/ghosh >>>>> >>>>> Twttr: @debasishg >>>>> Blog: http://debasishg.blogspot.com >>>>> Code: http://github.com/debasishg >>>>> >>>> >>> >>> -- >>> Debasish Ghosh >>> http://manning.com/ghosh2 >>> http://manning.com/ghosh >>> >>> Twttr: @debasishg >>> Blog: http://debasishg.blogspot.com >>> Code: http://github.com/debasishg >>> >> > > -- > Debasish Ghosh > http://manning.com/ghosh2 > http://manning.com/ghosh > > Twttr: @debasishg > Blog: http://debasishg.blogspot.com > Code: http://github.com/debasishg >