Hi Debasish, Have you taken a look at the AsyncIO API for running async operations? I think this is the preferred way of doing it. [1] So it would look something like this:
class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] { /** The database specific client that can issue concurrent requests with callbacks */ lazy val client: DatabaseClient = new DatabaseClient(host, post, credentials) /** The context used for the future callbacks */ implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor()) override def asyncInvoke(str: String, resultFuture: ResultFuture[(String, String)]): Unit = { // issue the asynchronous request, receive a future for the result val resultFutureRequested: Future[String] = client.query(str) // set the callback to be executed once the request by the client is complete // the callback simply forwards the result to the result future resultFutureRequested.onSuccess { case result: String => resultFuture.complete(Iterable((str, result))) } }} // create the original streamval stream: DataStream[String] = ... // apply the async I/O transformationval resultStream: DataStream[(String, String)] = AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100) Thanks, Rafi [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html#async-io-api On Wed, Sep 18, 2019 at 8:26 PM Debasish Ghosh <ghosh.debas...@gmail.com> wrote: > ok, the above problem was due to some serialization issues which we fixed > by marking some of the things transient. This fixes the serialization > issues .. But now when I try to execute in a Future I hit upon this .. > > > *java.util.concurrent.ExecutionException: Boxed Error* at > scala.concurrent.impl.Promise$.resolver(Promise.scala:87) > at > scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79) > at > scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) > at scala.concurrent.Promise.complete(Promise.scala:53) > at scala.concurrent.Promise.complete$(Promise.scala:52) > at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187) > at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) > at > java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402) > at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) > at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) > > *Caused by: > org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException* > at > org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) > at > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) > at > pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:280) > at pipelines.flink.FlinkStreamlet.$anonfun$run$2(FlinkStreamlet.scala:149) > at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:658) > at scala.util.Success.$anonfun$map$1(Try.scala:255) > at scala.util.Success.map(Try.scala:213) > at scala.concurrent.Future.$anonfun$map$1(Future.scala:292) > at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) > ... 7 more > > I found this issue in JIRA > https://issues.apache.org/jira/browse/FLINK-10381 which is still open and > talks about a related issue. But we are not submitting multiple jobs - we > are just submitting 1 job but async in a Future. I am not clear why this > should create the problem that I see. > > Can anyone please help with an explanation ? > > regards. > > On Wed, Sep 18, 2019 at 12:22 AM Debasish Ghosh <ghosh.debas...@gmail.com> > wrote: > >> I think the issue may not be linked with Future. What happens is when >> this piece of code is executed .. >> >> val rides: DataStream[TaxiRide] = >> readStream(inTaxiRide) >> .filter { ride ⇒ ride.getIsStart().booleanValue } >> .keyBy("rideId") >> >> val fares: DataStream[TaxiFare] = >> readStream(inTaxiFare) >> .keyBy("rideId") >> >> val processed: DataStream[TaxiRideFare] = >> rides >> .connect(fares) >> .flatMap(new EnrichmentFunction) >> >> somehow the ClosureCleaner gets executed as evident from the following >> which tries to serialize Avro data. Is there any way to pass the custom >> avro serializer that I am using ? >> >> org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG >> pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt >> type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5, >> startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT >> pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not >> serializable. The object probably contains or references non serializable >> fields. >> at >> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151) >> at >> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >> at >> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >> at >> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >> at >> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >> at >> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) >> at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574) >> at >> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185) >> at >> org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274) >> at >> org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179) >> at >> pipelines.examples.processor.TaxiRideProcessor$$anon$1.buildExecutionGraph(TaxiRideProcessor.scala:47) >> at >> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:278) >> at >> pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:149) >> at pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44) >> at scala.util.Try$.apply(Try.scala:213) >> at pipelines.runner.Runner$.run(Runner.scala:43) >> at pipelines.runner.Runner$.main(Runner.scala:30) >> at pipelines.runner.Runner.main(Runner.scala) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) >> at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) >> at >> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83) >> at >> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80) >> at >> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126) >> at >> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142) >> at >> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field >> at >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) >> at >> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >> at java.util.ArrayList.writeObject(ArrayList.java:766) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140) >> at >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) >> at >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >> at >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >> at >> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >> at >> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586) >> at >> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133) >> >> >> I also tried the following .. >> >> class EnrichmentFunction extends RichCoFlatMapFunction[TaxiRide, >> TaxiFare, TaxiRideFare] { >> >> @transient var rideState: ValueState[TaxiRide] = null >> @transient var fareState: ValueState[TaxiFare] = null >> >> override def open(params: Configuration): Unit = { >> super.open(params) >> rideState = getRuntimeContext.getState( >> new ValueStateDescriptor[TaxiRide]("saved ride", >> classOf[TaxiRide])) >> fareState = getRuntimeContext.getState( >> new ValueStateDescriptor[TaxiFare]("saved fare", >> classOf[TaxiFare])) >> } >> >> and moved the state initialization to open function. But still get the >> same result. >> >> Help ? >> >> regards. >> >> >> >> On Tue, Sep 17, 2019 at 12:28 PM Biao Liu <mmyy1...@gmail.com> wrote: >> >>> Hi Debasish, >>> >>> I guess the reason is something unexpectedly involved in serialization >>> due to a reference from inner class (anonymous class or lambda expression). >>> When Flink serializes this inner class instance, it would also serialize >>> all referenced objects, for example, the outer class instance. If the outer >>> class is not serializable, this error would happen. >>> >>> You could have a try to move the piece of codes to a named non-inner >>> class. >>> >>> Thanks, >>> Biao /'bɪ.aʊ/ >>> >>> >>> >>> On Tue, 17 Sep 2019 at 02:06, Debasish Ghosh <ghosh.debas...@gmail.com> >>> wrote: >>> >>>> My main question is why serialisation kicks in when I try to execute >>>> within a `Future` and not otherwise. >>>> >>>> regards. >>>> >>>> On Mon, 16 Sep 2019 at 4:46 PM, Debasish Ghosh < >>>> ghosh.debas...@gmail.com> wrote: >>>> >>>>> Yes, they are generated from Avro Schema and implements Serializable >>>>> .. >>>>> >>>>> On Mon, Sep 16, 2019 at 4:40 PM Deepak Sharma <deepakmc...@gmail.com> >>>>> wrote: >>>>> >>>>>> Does TaxiRide or TaxiRideFare implements Serializable? >>>>>> >>>>>> On Mon, Sep 16, 2019 at 3:47 PM Debasish Ghosh < >>>>>> ghosh.debas...@gmail.com> wrote: >>>>>> >>>>>>> Hello - >>>>>>> >>>>>>> The following piece of code is an example of a connected data >>>>>>> streams .. >>>>>>> >>>>>>> val rides: DataStream[TaxiRide] = >>>>>>> readStream(inTaxiRide) >>>>>>> .filter { ride ⇒ ride.getIsStart().booleanValue } >>>>>>> .keyBy("rideId") >>>>>>> >>>>>>> val fares: DataStream[TaxiFare] = >>>>>>> readStream(inTaxiFare) >>>>>>> .keyBy("rideId") >>>>>>> >>>>>>> val processed: DataStream[TaxiRideFare] = >>>>>>> rides >>>>>>> .connect(fares) >>>>>>> .flatMap(new EnrichmentFunction) >>>>>>> >>>>>>> When I execute the above logic using >>>>>>> StreamExecutionEnvironment.execute(..) it runs fine. >>>>>>> But if I try to execute the above from within a >>>>>>> scala.concurrent.Future, I get the following exception .. >>>>>>> >>>>>>> org.apache.flink.api.common.InvalidProgramException: [rideId >>>>>>> type:LONG pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, >>>>>>> passengerCnt type:INT pos:3, driverId type:LONG pos:4, startLon >>>>>>> type:FLOAT >>>>>>> pos:5, startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat >>>>>>> type:FLOAT pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] >>>>>>> is >>>>>>> not serializable. The object probably contains or references non >>>>>>> serializable fields. >>>>>>> at >>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151) >>>>>>> at >>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >>>>>>> at >>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >>>>>>> at >>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >>>>>>> at >>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >>>>>>> at >>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) >>>>>>> at >>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574) >>>>>>> at >>>>>>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185) >>>>>>> at >>>>>>> org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274) >>>>>>> at >>>>>>> org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179) >>>>>>> ... >>>>>>> >>>>>>> Caused by: java.io.NotSerializableException: >>>>>>> org.apache.avro.Schema$Field >>>>>>> at >>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) >>>>>>> >>>>>>> Any thoughts why this may happen ? >>>>>>> >>>>>>> regards. >>>>>>> >>>>>>> -- >>>>>>> Debasish Ghosh >>>>>>> http://manning.com/ghosh2 >>>>>>> http://manning.com/ghosh >>>>>>> >>>>>>> Twttr: @debasishg >>>>>>> Blog: http://debasishg.blogspot.com >>>>>>> Code: http://github.com/debasishg >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Thanks >>>>>> Deepak >>>>>> www.bigdatabig.com >>>>>> www.keosha.net >>>>>> >>>>> >>>>> >>>>> -- >>>>> Debasish Ghosh >>>>> http://manning.com/ghosh2 >>>>> http://manning.com/ghosh >>>>> >>>>> Twttr: @debasishg >>>>> Blog: http://debasishg.blogspot.com >>>>> Code: http://github.com/debasishg >>>>> >>>> -- >>>> Sent from my iPhone >>>> >>> >> >> -- >> Debasish Ghosh >> http://manning.com/ghosh2 >> http://manning.com/ghosh >> >> Twttr: @debasishg >> Blog: http://debasishg.blogspot.com >> Code: http://github.com/debasishg >> > > > -- > Debasish Ghosh > http://manning.com/ghosh2 > http://manning.com/ghosh > > Twttr: @debasishg > Blog: http://debasishg.blogspot.com > Code: http://github.com/debasishg >