Re: serialization issue in streaming job run with scala Future

2019-09-19 Thread Debasish Ghosh
Hi Yuval - Here's a brief summary f what we are trying to do .. At the library level we have this .. def buildExecutionGraph(): Unit def executeStreamingQueries(env: StreamExecutionEnvironment): JobExecutionResult = { buildExecutionGraph() env.execute(s"Executing $streamletRef") } and we d

Re: serialization issue in streaming job run with scala Future

2019-09-19 Thread Yuval Itzchakov
Debshish, could you share an example of before and after of your classes for future reference? On Thu, 19 Sep 2019, 10:42 Debasish Ghosh, wrote: > We solved the problem of serialization by making some things transient > which were being captured as part of the closure. So we no longer have > ser

Re: serialization issue in streaming job run with scala Future

2019-09-19 Thread Debasish Ghosh
We solved the problem of serialization by making some things transient which were being captured as part of the closure. So we no longer have serialization errors. Everything works properly without the future. I realize that because of statics concurrent job submission will be an issue. But we are

Re: serialization issue in streaming job run with scala Future

2019-09-19 Thread Biao Liu
Hi Debasish, I think there is something critical of your usage hided. It might help if you could provide more details. It still confuses me how you solve the serialization issue. Why the non-transient fields only affects serialization in a future? WRT this ProgramAbortException issue, do you sub

Re: serialization issue in streaming job run with scala Future

2019-09-19 Thread Debasish Ghosh
I think what you are pointing at is asynchronous datastream operations. In our case we want to submit the entire job in a Future. Something like the following .. def execute(..) = { // this does all data stream manipulation, joins etc. buildComputationGraph() // submits for execution with S

Re: serialization issue in streaming job run with scala Future

2019-09-18 Thread Rafi Aroch
Hi Debasish, Have you taken a look at the AsyncIO API for running async operations? I think this is the preferred way of doing it. [1] So it would look something like this: class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] { /** The database specific client that can

Re: serialization issue in streaming job run with scala Future

2019-09-18 Thread Debasish Ghosh
ok, the above problem was due to some serialization issues which we fixed by marking some of the things transient. This fixes the serialization issues .. But now when I try to execute in a Future I hit upon this .. *java.util.concurrent.ExecutionException: Boxed Error* at scala.concurrent.impl.Pr

Re: serialization issue in streaming job run with scala Future

2019-09-17 Thread Debasish Ghosh
I think the issue may not be linked with Future. What happens is when this piece of code is executed .. val rides: DataStream[TaxiRide] = readStream(inTaxiRide) .filter { ride ⇒ ride.getIsStart().booleanValue } .keyBy("rideId") val fares: DataStream[TaxiFare] = readStream(inTaxiFare)

Re: serialization issue in streaming job run with scala Future

2019-09-16 Thread Biao Liu
Hi Debasish, I guess the reason is something unexpectedly involved in serialization due to a reference from inner class (anonymous class or lambda expression). When Flink serializes this inner class instance, it would also serialize all referenced objects, for example, the outer class instance. If

Re: serialization issue in streaming job run with scala Future

2019-09-16 Thread Debasish Ghosh
My main question is why serialisation kicks in when I try to execute within a `Future` and not otherwise. regards. On Mon, 16 Sep 2019 at 4:46 PM, Debasish Ghosh wrote: > Yes, they are generated from Avro Schema and implements Serializable .. > > On Mon, Sep 16, 2019 at 4:40 PM Deepak Sharma >

Re: serialization issue in streaming job run with scala Future

2019-09-16 Thread Debasish Ghosh
Yes, they are generated from Avro Schema and implements Serializable .. On Mon, Sep 16, 2019 at 4:40 PM Deepak Sharma wrote: > Does TaxiRide or TaxiRideFare implements Serializable? > > On Mon, Sep 16, 2019 at 3:47 PM Debasish Ghosh > wrote: > >> Hello - >> >> The following piece of code is an

Re: serialization issue in streaming job run with scala Future

2019-09-16 Thread Deepak Sharma
Does TaxiRide or TaxiRideFare implements Serializable? On Mon, Sep 16, 2019 at 3:47 PM Debasish Ghosh wrote: > Hello - > > The following piece of code is an example of a connected data streams .. > > val rides: DataStream[TaxiRide] = > readStream(inTaxiRide) > .filter { ride ⇒ ride.getIsSt

serialization issue in streaming job run with scala Future

2019-09-16 Thread Debasish Ghosh
Hello - The following piece of code is an example of a connected data streams .. val rides: DataStream[TaxiRide] = readStream(inTaxiRide) .filter { ride ⇒ ride.getIsStart().booleanValue } .keyBy("rideId") val fares: DataStream[TaxiFare] = readStream(inTaxiFare) .keyBy("rideId")