I think what you are pointing at is asynchronous datastream operations. In our case we want to submit the entire job in a Future. Something like the following ..
def execute(..) = { // this does all data stream manipulation, joins etc. buildComputationGraph() // submits for execution with StreamExecutionEnvironment env.execute(..) } and we want to do .. val jobExecutionResultFuture = Future(execute(..)) and this gives that exception. regards. On Thu, Sep 19, 2019 at 11:00 AM Rafi Aroch <rafi.ar...@gmail.com> wrote: > Hi Debasish, > > Have you taken a look at the AsyncIO API for running async operations? I > think this is the preferred way of doing it. [1] > So it would look something like this: > > class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] { > > /** The database specific client that can issue concurrent requests with > callbacks */ > lazy val client: DatabaseClient = new DatabaseClient(host, post, > credentials) > > /** The context used for the future callbacks */ > implicit lazy val executor: ExecutionContext = > ExecutionContext.fromExecutor(Executors.directExecutor()) > > > override def asyncInvoke(str: String, resultFuture: ResultFuture[(String, > String)]): Unit = { > > // issue the asynchronous request, receive a future for the result > val resultFutureRequested: Future[String] = client.query(str) > > // set the callback to be executed once the request by the client is > complete // the callback simply forwards the result to the result > future resultFutureRequested.onSuccess { > case result: String => resultFuture.complete(Iterable((str, > result))) > } > }} > > // create the original streamval stream: DataStream[String] = ... > // apply the async I/O transformationval resultStream: DataStream[(String, > String)] = > AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, > TimeUnit.MILLISECONDS, 100) > > > Thanks, > Rafi > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html#async-io-api > > On Wed, Sep 18, 2019 at 8:26 PM Debasish Ghosh <ghosh.debas...@gmail.com> > wrote: > >> ok, the above problem was due to some serialization issues which we fixed >> by marking some of the things transient. This fixes the serialization >> issues .. But now when I try to execute in a Future I hit upon this .. >> >> >> *java.util.concurrent.ExecutionException: Boxed Error* at >> scala.concurrent.impl.Promise$.resolver(Promise.scala:87) >> at >> scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79) >> at >> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) >> at scala.concurrent.Promise.complete(Promise.scala:53) >> at scala.concurrent.Promise.complete$(Promise.scala:52) >> at >> scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187) >> at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) >> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) >> at >> java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402) >> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) >> at >> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) >> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) >> at >> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) >> >> *Caused by: >> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException* >> at >> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66) >> at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) >> at >> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) >> at >> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:280) >> at pipelines.flink.FlinkStreamlet.$anonfun$run$2(FlinkStreamlet.scala:149) >> at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:658) >> at scala.util.Success.$anonfun$map$1(Try.scala:255) >> at scala.util.Success.map(Try.scala:213) >> at scala.concurrent.Future.$anonfun$map$1(Future.scala:292) >> at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) >> ... 7 more >> >> I found this issue in JIRA >> https://issues.apache.org/jira/browse/FLINK-10381 which is still open >> and talks about a related issue. But we are not submitting multiple jobs - >> we are just submitting 1 job but async in a Future. I am not clear why this >> should create the problem that I see. >> >> Can anyone please help with an explanation ? >> >> regards. >> >> On Wed, Sep 18, 2019 at 12:22 AM Debasish Ghosh <ghosh.debas...@gmail.com> >> wrote: >> >>> I think the issue may not be linked with Future. What happens is when >>> this piece of code is executed .. >>> >>> val rides: DataStream[TaxiRide] = >>> readStream(inTaxiRide) >>> .filter { ride ⇒ ride.getIsStart().booleanValue } >>> .keyBy("rideId") >>> >>> val fares: DataStream[TaxiFare] = >>> readStream(inTaxiFare) >>> .keyBy("rideId") >>> >>> val processed: DataStream[TaxiRideFare] = >>> rides >>> .connect(fares) >>> .flatMap(new EnrichmentFunction) >>> >>> somehow the ClosureCleaner gets executed as evident from the following >>> which tries to serialize Avro data. Is there any way to pass the custom >>> avro serializer that I am using ? >>> >>> org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG >>> pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt >>> type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5, >>> startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT >>> pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not >>> serializable. The object probably contains or references non serializable >>> fields. >>> at >>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151) >>> at >>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >>> at >>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >>> at >>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >>> at >>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >>> at >>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) >>> at >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574) >>> at >>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185) >>> at >>> org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274) >>> at >>> org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179) >>> at >>> pipelines.examples.processor.TaxiRideProcessor$$anon$1.buildExecutionGraph(TaxiRideProcessor.scala:47) >>> at >>> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:278) >>> at >>> pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:149) >>> at pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44) >>> at scala.util.Try$.apply(Try.scala:213) >>> at pipelines.runner.Runner$.run(Runner.scala:43) >>> at pipelines.runner.Runner$.main(Runner.scala:30) >>> at pipelines.runner.Runner.main(Runner.scala) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native >>> Method) >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:498) >>> at >>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) >>> at >>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) >>> at >>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83) >>> at >>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80) >>> at >>> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126) >>> at >>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142) >>> at >>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>> at java.lang.Thread.run(Thread.java:748) >>> Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field >>> at >>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) >>> at >>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >>> at java.util.ArrayList.writeObject(ArrayList.java:766) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native >>> Method) >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:498) >>> at >>> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140) >>> at >>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) >>> at >>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >>> at >>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >>> at >>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >>> at >>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586) >>> at >>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133) >>> >>> >>> I also tried the following .. >>> >>> class EnrichmentFunction extends RichCoFlatMapFunction[TaxiRide, >>> TaxiFare, TaxiRideFare] { >>> >>> @transient var rideState: ValueState[TaxiRide] = null >>> @transient var fareState: ValueState[TaxiFare] = null >>> >>> override def open(params: Configuration): Unit = { >>> super.open(params) >>> rideState = getRuntimeContext.getState( >>> new ValueStateDescriptor[TaxiRide]("saved ride", >>> classOf[TaxiRide])) >>> fareState = getRuntimeContext.getState( >>> new ValueStateDescriptor[TaxiFare]("saved fare", >>> classOf[TaxiFare])) >>> } >>> >>> and moved the state initialization to open function. But still get the >>> same result. >>> >>> Help ? >>> >>> regards. >>> >>> >>> >>> On Tue, Sep 17, 2019 at 12:28 PM Biao Liu <mmyy1...@gmail.com> wrote: >>> >>>> Hi Debasish, >>>> >>>> I guess the reason is something unexpectedly involved in serialization >>>> due to a reference from inner class (anonymous class or lambda expression). >>>> When Flink serializes this inner class instance, it would also >>>> serialize all referenced objects, for example, the outer class instance. If >>>> the outer class is not serializable, this error would happen. >>>> >>>> You could have a try to move the piece of codes to a named non-inner >>>> class. >>>> >>>> Thanks, >>>> Biao /'bɪ.aʊ/ >>>> >>>> >>>> >>>> On Tue, 17 Sep 2019 at 02:06, Debasish Ghosh <ghosh.debas...@gmail.com> >>>> wrote: >>>> >>>>> My main question is why serialisation kicks in when I try to execute >>>>> within a `Future` and not otherwise. >>>>> >>>>> regards. >>>>> >>>>> On Mon, 16 Sep 2019 at 4:46 PM, Debasish Ghosh < >>>>> ghosh.debas...@gmail.com> wrote: >>>>> >>>>>> Yes, they are generated from Avro Schema and implements Serializable >>>>>> .. >>>>>> >>>>>> On Mon, Sep 16, 2019 at 4:40 PM Deepak Sharma <deepakmc...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Does TaxiRide or TaxiRideFare implements Serializable? >>>>>>> >>>>>>> On Mon, Sep 16, 2019 at 3:47 PM Debasish Ghosh < >>>>>>> ghosh.debas...@gmail.com> wrote: >>>>>>> >>>>>>>> Hello - >>>>>>>> >>>>>>>> The following piece of code is an example of a connected data >>>>>>>> streams .. >>>>>>>> >>>>>>>> val rides: DataStream[TaxiRide] = >>>>>>>> readStream(inTaxiRide) >>>>>>>> .filter { ride ⇒ ride.getIsStart().booleanValue } >>>>>>>> .keyBy("rideId") >>>>>>>> >>>>>>>> val fares: DataStream[TaxiFare] = >>>>>>>> readStream(inTaxiFare) >>>>>>>> .keyBy("rideId") >>>>>>>> >>>>>>>> val processed: DataStream[TaxiRideFare] = >>>>>>>> rides >>>>>>>> .connect(fares) >>>>>>>> .flatMap(new EnrichmentFunction) >>>>>>>> >>>>>>>> When I execute the above logic using >>>>>>>> StreamExecutionEnvironment.execute(..) it runs fine. >>>>>>>> But if I try to execute the above from within a >>>>>>>> scala.concurrent.Future, I get the following exception .. >>>>>>>> >>>>>>>> org.apache.flink.api.common.InvalidProgramException: [rideId >>>>>>>> type:LONG pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, >>>>>>>> passengerCnt type:INT pos:3, driverId type:LONG pos:4, startLon >>>>>>>> type:FLOAT >>>>>>>> pos:5, startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat >>>>>>>> type:FLOAT pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] >>>>>>>> is >>>>>>>> not serializable. The object probably contains or references non >>>>>>>> serializable fields. >>>>>>>> at >>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151) >>>>>>>> at >>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >>>>>>>> at >>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >>>>>>>> at >>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >>>>>>>> at >>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >>>>>>>> at >>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) >>>>>>>> at >>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574) >>>>>>>> at >>>>>>>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185) >>>>>>>> at >>>>>>>> org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274) >>>>>>>> at >>>>>>>> org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179) >>>>>>>> ... >>>>>>>> >>>>>>>> Caused by: java.io.NotSerializableException: >>>>>>>> org.apache.avro.Schema$Field >>>>>>>> at >>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) >>>>>>>> >>>>>>>> Any thoughts why this may happen ? >>>>>>>> >>>>>>>> regards. >>>>>>>> >>>>>>>> -- >>>>>>>> Debasish Ghosh >>>>>>>> http://manning.com/ghosh2 >>>>>>>> http://manning.com/ghosh >>>>>>>> >>>>>>>> Twttr: @debasishg >>>>>>>> Blog: http://debasishg.blogspot.com >>>>>>>> Code: http://github.com/debasishg >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Thanks >>>>>>> Deepak >>>>>>> www.bigdatabig.com >>>>>>> www.keosha.net >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Debasish Ghosh >>>>>> http://manning.com/ghosh2 >>>>>> http://manning.com/ghosh >>>>>> >>>>>> Twttr: @debasishg >>>>>> Blog: http://debasishg.blogspot.com >>>>>> Code: http://github.com/debasishg >>>>>> >>>>> -- >>>>> Sent from my iPhone >>>>> >>>> >>> >>> -- >>> Debasish Ghosh >>> http://manning.com/ghosh2 >>> http://manning.com/ghosh >>> >>> Twttr: @debasishg >>> Blog: http://debasishg.blogspot.com >>> Code: http://github.com/debasishg >>> >> >> >> -- >> Debasish Ghosh >> http://manning.com/ghosh2 >> http://manning.com/ghosh >> >> Twttr: @debasishg >> Blog: http://debasishg.blogspot.com >> Code: http://github.com/debasishg >> > -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg