Hi Yuval -
Here's a brief summary f what we are trying to do ..
At the library level we have this ..
def buildExecutionGraph(): Unit
def executeStreamingQueries(env: StreamExecutionEnvironment):
JobExecutionResult = {
buildExecutionGraph()
env.execute(s"Executing $streamletRef")
}
and we d
Debshish, could you share an example of before and after of your classes
for future reference?
On Thu, 19 Sep 2019, 10:42 Debasish Ghosh, wrote:
> We solved the problem of serialization by making some things transient
> which were being captured as part of the closure. So we no longer have
> ser
We solved the problem of serialization by making some things transient
which were being captured as part of the closure. So we no longer have
serialization errors. Everything works properly without the future.
I realize that because of statics concurrent job submission will be an
issue. But we are
Hi Debasish,
I think there is something critical of your usage hided. It might help if
you could provide more details.
It still confuses me how you solve the serialization issue. Why the
non-transient fields only affects serialization in a future?
WRT this ProgramAbortException issue, do you sub
I think what you are pointing at is asynchronous datastream operations. In
our case we want to submit the entire job in a Future. Something like the
following ..
def execute(..) = {
// this does all data stream manipulation, joins etc.
buildComputationGraph()
// submits for execution with S
Hi Debasish,
Have you taken a look at the AsyncIO API for running async operations? I
think this is the preferred way of doing it. [1]
So it would look something like this:
class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {
/** The database specific client that can
ok, the above problem was due to some serialization issues which we fixed
by marking some of the things transient. This fixes the serialization
issues .. But now when I try to execute in a Future I hit upon this ..
*java.util.concurrent.ExecutionException: Boxed Error* at
scala.concurrent.impl.Pr
I think the issue may not be linked with Future. What happens is when this
piece of code is executed ..
val rides: DataStream[TaxiRide] =
readStream(inTaxiRide)
.filter { ride ⇒ ride.getIsStart().booleanValue }
.keyBy("rideId")
val fares: DataStream[TaxiFare] =
readStream(inTaxiFare)
Hi Debasish,
I guess the reason is something unexpectedly involved in serialization due
to a reference from inner class (anonymous class or lambda expression).
When Flink serializes this inner class instance, it would also serialize
all referenced objects, for example, the outer class instance. If
My main question is why serialisation kicks in when I try to execute within
a `Future` and not otherwise.
regards.
On Mon, 16 Sep 2019 at 4:46 PM, Debasish Ghosh
wrote:
> Yes, they are generated from Avro Schema and implements Serializable ..
>
> On Mon, Sep 16, 2019 at 4:40 PM Deepak Sharma
>
Yes, they are generated from Avro Schema and implements Serializable ..
On Mon, Sep 16, 2019 at 4:40 PM Deepak Sharma wrote:
> Does TaxiRide or TaxiRideFare implements Serializable?
>
> On Mon, Sep 16, 2019 at 3:47 PM Debasish Ghosh
> wrote:
>
>> Hello -
>>
>> The following piece of code is an
Does TaxiRide or TaxiRideFare implements Serializable?
On Mon, Sep 16, 2019 at 3:47 PM Debasish Ghosh
wrote:
> Hello -
>
> The following piece of code is an example of a connected data streams ..
>
> val rides: DataStream[TaxiRide] =
> readStream(inTaxiRide)
> .filter { ride ⇒ ride.getIsSt
Hello -
The following piece of code is an example of a connected data streams ..
val rides: DataStream[TaxiRide] =
readStream(inTaxiRide)
.filter { ride ⇒ ride.getIsStart().booleanValue }
.keyBy("rideId")
val fares: DataStream[TaxiFare] =
readStream(inTaxiFare)
.keyBy("rideId")
13 matches
Mail list logo