I think what you are pointing at is asynchronous datastream operations. In
our case we want to submit the entire job in a Future. Something like the
following ..

def execute(..) = {
  // this does all data stream manipulation, joins etc.
  buildComputationGraph()

  // submits for execution with StreamExecutionEnvironment
  env.execute(..)
}

and we want to do ..

val jobExecutionResultFuture = Future(execute(..))

and this gives that exception.

regards.

On Thu, Sep 19, 2019 at 11:00 AM Rafi Aroch <rafi.ar...@gmail.com> wrote:

> Hi Debasish,
>
> Have you taken a look at the AsyncIO API for running async operations? I
> think this is the preferred way of doing it. [1]
> So it would look something like this:
>
> class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {
>
>     /** The database specific client that can issue concurrent requests with 
> callbacks */
>     lazy val client: DatabaseClient = new DatabaseClient(host, post, 
> credentials)
>
>     /** The context used for the future callbacks */
>     implicit lazy val executor: ExecutionContext = 
> ExecutionContext.fromExecutor(Executors.directExecutor())
>
>
>     override def asyncInvoke(str: String, resultFuture: ResultFuture[(String, 
> String)]): Unit = {
>
>         // issue the asynchronous request, receive a future for the result    
>     val resultFutureRequested: Future[String] = client.query(str)
>
>         // set the callback to be executed once the request by the client is 
> complete        // the callback simply forwards the result to the result 
> future        resultFutureRequested.onSuccess {
>             case result: String => resultFuture.complete(Iterable((str, 
> result)))
>         }
>     }}
>
> // create the original streamval stream: DataStream[String] = ...
> // apply the async I/O transformationval resultStream: DataStream[(String, 
> String)] =
>     AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, 
> TimeUnit.MILLISECONDS, 100)
>
>
> Thanks,
> Rafi
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html#async-io-api
>
> On Wed, Sep 18, 2019 at 8:26 PM Debasish Ghosh <ghosh.debas...@gmail.com>
> wrote:
>
>> ok, the above problem was due to some serialization issues which we fixed
>> by marking some of the things transient. This fixes the serialization
>> issues .. But now when I try to execute in a Future I hit upon this ..
>>
>>
>> *java.util.concurrent.ExecutionException: Boxed Error* at
>> scala.concurrent.impl.Promise$.resolver(Promise.scala:87)
>> at
>> scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79)
>> at
>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
>> at scala.concurrent.Promise.complete(Promise.scala:53)
>> at scala.concurrent.Promise.complete$(Promise.scala:52)
>> at
>> scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187)
>> at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
>> at
>> java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
>> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>> at
>> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
>> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
>> at
>> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
>>
>> *Caused by:
>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException*
>> at
>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
>> at
>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
>> at
>> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:280)
>> at pipelines.flink.FlinkStreamlet.$anonfun$run$2(FlinkStreamlet.scala:149)
>> at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:658)
>> at scala.util.Success.$anonfun$map$1(Try.scala:255)
>> at scala.util.Success.map(Try.scala:213)
>> at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
>> at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
>> ... 7 more
>>
>> I found this issue in JIRA
>> https://issues.apache.org/jira/browse/FLINK-10381 which is still open
>> and talks about a related issue. But we are not submitting multiple jobs -
>> we are just submitting 1 job but async in a Future. I am not clear why this
>> should create the problem that I see.
>>
>> Can anyone please help with an explanation ?
>>
>> regards.
>>
>> On Wed, Sep 18, 2019 at 12:22 AM Debasish Ghosh <ghosh.debas...@gmail.com>
>> wrote:
>>
>>> I think the issue may not be linked with Future. What happens is when
>>> this piece of code is executed ..
>>>
>>> val rides: DataStream[TaxiRide] =
>>>   readStream(inTaxiRide)
>>>     .filter { ride ⇒ ride.getIsStart().booleanValue }
>>>     .keyBy("rideId")
>>>
>>> val fares: DataStream[TaxiFare] =
>>>   readStream(inTaxiFare)
>>>     .keyBy("rideId")
>>>
>>> val processed: DataStream[TaxiRideFare] =
>>>   rides
>>>     .connect(fares)
>>>     .flatMap(new EnrichmentFunction)
>>>
>>> somehow the ClosureCleaner gets executed as evident from the following
>>> which tries to serialize Avro data. Is there any way to pass the custom
>>> avro serializer that I am using ?
>>>
>>> org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG
>>> pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt
>>> type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5,
>>> startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT
>>> pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not
>>> serializable. The object probably contains or references non serializable
>>> fields.
>>>             at
>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
>>>             at
>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>>             at
>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>>             at
>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>>             at
>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>>             at
>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
>>>             at
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
>>>             at
>>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
>>>             at
>>> org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274)
>>>             at
>>> org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179)
>>>             at
>>> pipelines.examples.processor.TaxiRideProcessor$$anon$1.buildExecutionGraph(TaxiRideProcessor.scala:47)
>>>             at
>>> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:278)
>>>             at
>>> pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:149)
>>>             at pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44)
>>>             at scala.util.Try$.apply(Try.scala:213)
>>>             at pipelines.runner.Runner$.run(Runner.scala:43)
>>>             at pipelines.runner.Runner$.main(Runner.scala:30)
>>>             at pipelines.runner.Runner.main(Runner.scala)
>>>             at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>>> Method)
>>>             at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>             at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>             at java.lang.reflect.Method.invoke(Method.java:498)
>>>             at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>>>             at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
>>>             at
>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
>>>             at
>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
>>>             at
>>> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
>>>             at
>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
>>>             at
>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>>>             at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>             at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>             at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
>>>             at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>>             at
>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>>             at java.util.ArrayList.writeObject(ArrayList.java:766)
>>>             at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>>> Method)
>>>             at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>             at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>             at java.lang.reflect.Method.invoke(Method.java:498)
>>>             at
>>> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
>>>             at
>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>>>             at
>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>>             at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>>             at
>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>>             at
>>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
>>>             at
>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133)
>>>
>>>
>>> I also tried the following ..
>>>
>>> class EnrichmentFunction extends RichCoFlatMapFunction[TaxiRide,
>>> TaxiFare, TaxiRideFare] {
>>>
>>>     @transient var rideState: ValueState[TaxiRide] = null
>>>     @transient var fareState: ValueState[TaxiFare] = null
>>>
>>>     override def open(params: Configuration): Unit = {
>>>       super.open(params)
>>>       rideState = getRuntimeContext.getState(
>>>         new ValueStateDescriptor[TaxiRide]("saved ride",
>>> classOf[TaxiRide]))
>>>       fareState = getRuntimeContext.getState(
>>>         new ValueStateDescriptor[TaxiFare]("saved fare",
>>> classOf[TaxiFare]))
>>>     }
>>>
>>> and moved the state initialization to open function. But still get the
>>> same result.
>>>
>>> Help ?
>>>
>>> regards.
>>>
>>>
>>>
>>> On Tue, Sep 17, 2019 at 12:28 PM Biao Liu <mmyy1...@gmail.com> wrote:
>>>
>>>> Hi Debasish,
>>>>
>>>> I guess the reason is something unexpectedly involved in serialization
>>>> due to a reference from inner class (anonymous class or lambda expression).
>>>> When Flink serializes this inner class instance, it would also
>>>> serialize all referenced objects, for example, the outer class instance. If
>>>> the outer class is not serializable, this error would happen.
>>>>
>>>> You could have a try to move the piece of codes to a named non-inner
>>>> class.
>>>>
>>>> Thanks,
>>>> Biao /'bɪ.aʊ/
>>>>
>>>>
>>>>
>>>> On Tue, 17 Sep 2019 at 02:06, Debasish Ghosh <ghosh.debas...@gmail.com>
>>>> wrote:
>>>>
>>>>> My main question is why serialisation kicks in when I try to execute
>>>>> within a `Future` and not otherwise.
>>>>>
>>>>> regards.
>>>>>
>>>>> On Mon, 16 Sep 2019 at 4:46 PM, Debasish Ghosh <
>>>>> ghosh.debas...@gmail.com> wrote:
>>>>>
>>>>>> Yes, they are generated from Avro Schema and implements Serializable
>>>>>> ..
>>>>>>
>>>>>> On Mon, Sep 16, 2019 at 4:40 PM Deepak Sharma <deepakmc...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Does TaxiRide or TaxiRideFare implements Serializable?
>>>>>>>
>>>>>>> On Mon, Sep 16, 2019 at 3:47 PM Debasish Ghosh <
>>>>>>> ghosh.debas...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hello -
>>>>>>>>
>>>>>>>> The following piece of code is an example of a connected data
>>>>>>>> streams ..
>>>>>>>>
>>>>>>>> val rides: DataStream[TaxiRide] =
>>>>>>>>   readStream(inTaxiRide)
>>>>>>>>     .filter { ride ⇒ ride.getIsStart().booleanValue }
>>>>>>>>     .keyBy("rideId")
>>>>>>>>
>>>>>>>> val fares: DataStream[TaxiFare] =
>>>>>>>>   readStream(inTaxiFare)
>>>>>>>>     .keyBy("rideId")
>>>>>>>>
>>>>>>>> val processed: DataStream[TaxiRideFare] =
>>>>>>>>   rides
>>>>>>>>     .connect(fares)
>>>>>>>>     .flatMap(new EnrichmentFunction)
>>>>>>>>
>>>>>>>> When I execute the above logic using
>>>>>>>> StreamExecutionEnvironment.execute(..) it runs fine.
>>>>>>>> But if I try to execute the above from within a
>>>>>>>> scala.concurrent.Future, I get the following exception ..
>>>>>>>>
>>>>>>>> org.apache.flink.api.common.InvalidProgramException: [rideId
>>>>>>>> type:LONG pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2,
>>>>>>>> passengerCnt type:INT pos:3, driverId type:LONG pos:4, startLon 
>>>>>>>> type:FLOAT
>>>>>>>> pos:5, startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat
>>>>>>>> type:FLOAT pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] 
>>>>>>>> is
>>>>>>>> not serializable. The object probably contains or references non
>>>>>>>> serializable fields.
>>>>>>>>             at
>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
>>>>>>>>             at
>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>>>>>>>             at
>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>>>>>>>             at
>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>>>>>>>             at
>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>>>>>>>             at
>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
>>>>>>>>             at
>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
>>>>>>>>             at
>>>>>>>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
>>>>>>>>             at
>>>>>>>> org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274)
>>>>>>>>             at
>>>>>>>> org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179)
>>>>>>>>   ...
>>>>>>>>
>>>>>>>> Caused by: java.io.NotSerializableException:
>>>>>>>> org.apache.avro.Schema$Field
>>>>>>>>             at
>>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>>>>>>>
>>>>>>>> Any thoughts why this may happen ?
>>>>>>>>
>>>>>>>> regards.
>>>>>>>>
>>>>>>>> --
>>>>>>>> Debasish Ghosh
>>>>>>>> http://manning.com/ghosh2
>>>>>>>> http://manning.com/ghosh
>>>>>>>>
>>>>>>>> Twttr: @debasishg
>>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>>> Code: http://github.com/debasishg
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Thanks
>>>>>>> Deepak
>>>>>>> www.bigdatabig.com
>>>>>>> www.keosha.net
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Debasish Ghosh
>>>>>> http://manning.com/ghosh2
>>>>>> http://manning.com/ghosh
>>>>>>
>>>>>> Twttr: @debasishg
>>>>>> Blog: http://debasishg.blogspot.com
>>>>>> Code: http://github.com/debasishg
>>>>>>
>>>>> --
>>>>> Sent from my iPhone
>>>>>
>>>>
>>>
>>> --
>>> Debasish Ghosh
>>> http://manning.com/ghosh2
>>> http://manning.com/ghosh
>>>
>>> Twttr: @debasishg
>>> Blog: http://debasishg.blogspot.com
>>> Code: http://github.com/debasishg
>>>
>>
>>
>> --
>> Debasish Ghosh
>> http://manning.com/ghosh2
>> http://manning.com/ghosh
>>
>> Twttr: @debasishg
>> Blog: http://debasishg.blogspot.com
>> Code: http://github.com/debasishg
>>
>

-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Reply via email to