Hi Debasish,

Have you taken a look at the AsyncIO API for running async operations? I
think this is the preferred way of doing it. [1]
So it would look something like this:

class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {

    /** The database specific client that can issue concurrent
requests with callbacks */
    lazy val client: DatabaseClient = new DatabaseClient(host, post,
credentials)

    /** The context used for the future callbacks */
    implicit lazy val executor: ExecutionContext =
ExecutionContext.fromExecutor(Executors.directExecutor())


    override def asyncInvoke(str: String, resultFuture:
ResultFuture[(String, String)]): Unit = {

        // issue the asynchronous request, receive a future for the
result        val resultFutureRequested: Future[String] =
client.query(str)

        // set the callback to be executed once the request by the
client is complete        // the callback simply forwards the result
to the result future        resultFutureRequested.onSuccess {
            case result: String =>
resultFuture.complete(Iterable((str, result)))
        }
    }}

// create the original streamval stream: DataStream[String] = ...
// apply the async I/O transformationval resultStream:
DataStream[(String, String)] =
    AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(),
1000, TimeUnit.MILLISECONDS, 100)


Thanks,
Rafi

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html#async-io-api

On Wed, Sep 18, 2019 at 8:26 PM Debasish Ghosh <ghosh.debas...@gmail.com>
wrote:

> ok, the above problem was due to some serialization issues which we fixed
> by marking some of the things transient. This fixes the serialization
> issues .. But now when I try to execute in a Future I hit upon this ..
>
>
> *java.util.concurrent.ExecutionException: Boxed Error* at
> scala.concurrent.impl.Promise$.resolver(Promise.scala:87)
> at
> scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79)
> at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
> at scala.concurrent.Promise.complete(Promise.scala:53)
> at scala.concurrent.Promise.complete$(Promise.scala:52)
> at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187)
> at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
> at
> java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> at
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> at
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
>
> *Caused by:
> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException*
> at
> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
> at
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
> at
> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:280)
> at pipelines.flink.FlinkStreamlet.$anonfun$run$2(FlinkStreamlet.scala:149)
> at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:658)
> at scala.util.Success.$anonfun$map$1(Try.scala:255)
> at scala.util.Success.map(Try.scala:213)
> at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
> at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
> ... 7 more
>
> I found this issue in JIRA
> https://issues.apache.org/jira/browse/FLINK-10381 which is still open and
> talks about a related issue. But we are not submitting multiple jobs - we
> are just submitting 1 job but async in a Future. I am not clear why this
> should create the problem that I see.
>
> Can anyone please help with an explanation ?
>
> regards.
>
> On Wed, Sep 18, 2019 at 12:22 AM Debasish Ghosh <ghosh.debas...@gmail.com>
> wrote:
>
>> I think the issue may not be linked with Future. What happens is when
>> this piece of code is executed ..
>>
>> val rides: DataStream[TaxiRide] =
>>   readStream(inTaxiRide)
>>     .filter { ride ⇒ ride.getIsStart().booleanValue }
>>     .keyBy("rideId")
>>
>> val fares: DataStream[TaxiFare] =
>>   readStream(inTaxiFare)
>>     .keyBy("rideId")
>>
>> val processed: DataStream[TaxiRideFare] =
>>   rides
>>     .connect(fares)
>>     .flatMap(new EnrichmentFunction)
>>
>> somehow the ClosureCleaner gets executed as evident from the following
>> which tries to serialize Avro data. Is there any way to pass the custom
>> avro serializer that I am using ?
>>
>> org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG
>> pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt
>> type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5,
>> startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT
>> pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not
>> serializable. The object probably contains or references non serializable
>> fields.
>>             at
>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
>>             at
>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>             at
>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>             at
>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>             at
>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>             at
>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
>>             at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
>>             at
>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
>>             at
>> org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274)
>>             at
>> org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179)
>>             at
>> pipelines.examples.processor.TaxiRideProcessor$$anon$1.buildExecutionGraph(TaxiRideProcessor.scala:47)
>>             at
>> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:278)
>>             at
>> pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:149)
>>             at pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44)
>>             at scala.util.Try$.apply(Try.scala:213)
>>             at pipelines.runner.Runner$.run(Runner.scala:43)
>>             at pipelines.runner.Runner$.main(Runner.scala:30)
>>             at pipelines.runner.Runner.main(Runner.scala)
>>             at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>             at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>             at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>             at java.lang.reflect.Method.invoke(Method.java:498)
>>             at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>>             at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
>>             at
>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
>>             at
>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
>>             at
>> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
>>             at
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
>>             at
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>>             at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>             at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>             at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
>>             at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>             at
>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>             at java.util.ArrayList.writeObject(ArrayList.java:766)
>>             at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>             at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>             at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>             at java.lang.reflect.Method.invoke(Method.java:498)
>>             at
>> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
>>             at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>>             at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>             at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>             at
>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>             at
>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
>>             at
>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133)
>>
>>
>> I also tried the following ..
>>
>> class EnrichmentFunction extends RichCoFlatMapFunction[TaxiRide,
>> TaxiFare, TaxiRideFare] {
>>
>>     @transient var rideState: ValueState[TaxiRide] = null
>>     @transient var fareState: ValueState[TaxiFare] = null
>>
>>     override def open(params: Configuration): Unit = {
>>       super.open(params)
>>       rideState = getRuntimeContext.getState(
>>         new ValueStateDescriptor[TaxiRide]("saved ride",
>> classOf[TaxiRide]))
>>       fareState = getRuntimeContext.getState(
>>         new ValueStateDescriptor[TaxiFare]("saved fare",
>> classOf[TaxiFare]))
>>     }
>>
>> and moved the state initialization to open function. But still get the
>> same result.
>>
>> Help ?
>>
>> regards.
>>
>>
>>
>> On Tue, Sep 17, 2019 at 12:28 PM Biao Liu <mmyy1...@gmail.com> wrote:
>>
>>> Hi Debasish,
>>>
>>> I guess the reason is something unexpectedly involved in serialization
>>> due to a reference from inner class (anonymous class or lambda expression).
>>> When Flink serializes this inner class instance, it would also serialize
>>> all referenced objects, for example, the outer class instance. If the outer
>>> class is not serializable, this error would happen.
>>>
>>> You could have a try to move the piece of codes to a named non-inner
>>> class.
>>>
>>> Thanks,
>>> Biao /'bɪ.aʊ/
>>>
>>>
>>>
>>> On Tue, 17 Sep 2019 at 02:06, Debasish Ghosh <ghosh.debas...@gmail.com>
>>> wrote:
>>>
>>>> My main question is why serialisation kicks in when I try to execute
>>>> within a `Future` and not otherwise.
>>>>
>>>> regards.
>>>>
>>>> On Mon, 16 Sep 2019 at 4:46 PM, Debasish Ghosh <
>>>> ghosh.debas...@gmail.com> wrote:
>>>>
>>>>> Yes, they are generated from Avro Schema and implements Serializable
>>>>> ..
>>>>>
>>>>> On Mon, Sep 16, 2019 at 4:40 PM Deepak Sharma <deepakmc...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Does TaxiRide or TaxiRideFare implements Serializable?
>>>>>>
>>>>>> On Mon, Sep 16, 2019 at 3:47 PM Debasish Ghosh <
>>>>>> ghosh.debas...@gmail.com> wrote:
>>>>>>
>>>>>>> Hello -
>>>>>>>
>>>>>>> The following piece of code is an example of a connected data
>>>>>>> streams ..
>>>>>>>
>>>>>>> val rides: DataStream[TaxiRide] =
>>>>>>>   readStream(inTaxiRide)
>>>>>>>     .filter { ride ⇒ ride.getIsStart().booleanValue }
>>>>>>>     .keyBy("rideId")
>>>>>>>
>>>>>>> val fares: DataStream[TaxiFare] =
>>>>>>>   readStream(inTaxiFare)
>>>>>>>     .keyBy("rideId")
>>>>>>>
>>>>>>> val processed: DataStream[TaxiRideFare] =
>>>>>>>   rides
>>>>>>>     .connect(fares)
>>>>>>>     .flatMap(new EnrichmentFunction)
>>>>>>>
>>>>>>> When I execute the above logic using
>>>>>>> StreamExecutionEnvironment.execute(..) it runs fine.
>>>>>>> But if I try to execute the above from within a
>>>>>>> scala.concurrent.Future, I get the following exception ..
>>>>>>>
>>>>>>> org.apache.flink.api.common.InvalidProgramException: [rideId
>>>>>>> type:LONG pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2,
>>>>>>> passengerCnt type:INT pos:3, driverId type:LONG pos:4, startLon 
>>>>>>> type:FLOAT
>>>>>>> pos:5, startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat
>>>>>>> type:FLOAT pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] 
>>>>>>> is
>>>>>>> not serializable. The object probably contains or references non
>>>>>>> serializable fields.
>>>>>>>             at
>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
>>>>>>>             at
>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>>>>>>             at
>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>>>>>>             at
>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>>>>>>             at
>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>>>>>>             at
>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
>>>>>>>             at
>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
>>>>>>>             at
>>>>>>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
>>>>>>>             at
>>>>>>> org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274)
>>>>>>>             at
>>>>>>> org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179)
>>>>>>>   ...
>>>>>>>
>>>>>>> Caused by: java.io.NotSerializableException:
>>>>>>> org.apache.avro.Schema$Field
>>>>>>>             at
>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>>>>>>
>>>>>>> Any thoughts why this may happen ?
>>>>>>>
>>>>>>> regards.
>>>>>>>
>>>>>>> --
>>>>>>> Debasish Ghosh
>>>>>>> http://manning.com/ghosh2
>>>>>>> http://manning.com/ghosh
>>>>>>>
>>>>>>> Twttr: @debasishg
>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>> Code: http://github.com/debasishg
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Thanks
>>>>>> Deepak
>>>>>> www.bigdatabig.com
>>>>>> www.keosha.net
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Debasish Ghosh
>>>>> http://manning.com/ghosh2
>>>>> http://manning.com/ghosh
>>>>>
>>>>> Twttr: @debasishg
>>>>> Blog: http://debasishg.blogspot.com
>>>>> Code: http://github.com/debasishg
>>>>>
>>>> --
>>>> Sent from my iPhone
>>>>
>>>
>>
>> --
>> Debasish Ghosh
>> http://manning.com/ghosh2
>> http://manning.com/ghosh
>>
>> Twttr: @debasishg
>> Blog: http://debasishg.blogspot.com
>> Code: http://github.com/debasishg
>>
>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>

Reply via email to