Hi All,

I have implemented a custom sourcefuntion on a datasource with an
asynchronous API (the API calls return Scala futures). I need to perform
calls to the asynchronous API during initialization of each individual
(parallel) source instance, and when in exacly-once mode also during
snapshotstate or inside the run loop. The polling loop itself is
synchronous.

Since I am (at least for now) not worried about performance, I just used
Await.result() to perform a blocking wait on each asynchronous call (
https://docs.scala-lang.org/overviews/core/futures.html#blocking-outside-the-future).
This worked fine so far in Flink 1.4.2, but when I upgrade to Flink 1.5 the
futures never complete (Eventually causing timeout exceptions on the
await.result call).

The issue occurs in integration tests where the Flink jobs run locally (in
a minicluster). The issue does not occur on my local machines, but does so
consistently on Travis. I therefore suspect the issue is related to the
number of cores/workers that are available. Await.result however uses the
blockingContext, which is backed by the forkjoinpool. I do not expect a few
asynchronous calls to run into any limitations there. Compiling and running
the same code with Flink 1.4.2 works fine. The issue occurs both when
performing Await.Result() inside the run loop or inside initializeState().

Am I breaking the process model when using Await.result on asynchronous api
calls within initializeState or snapshotState in a sourcefunction (or Sink
for that matter)? With Await.result I do make sure the calls are created
and awaited within a single checkpoint.

Any other suggestions where to look for the problem, or explanation why
this issue could occur when upgrading from 1.4.2 to 1.5.0?


Thank you for your help!

Cheers,
Niels

Reply via email to