Thank you! that did it ! On Wed, May 8, 2019 at 5:42 PM Till Rohrmann <trohrm...@apache.org> wrote:
> *This Message originated outside your organization.* > ------------------------------ > Hi Avi, > > you need to complete the given resultFuture and not return a future. You > can do this via resultFuture.complete(r). > > Cheers, > Till > > On Tue, May 7, 2019 at 8:30 PM Avi Levi <avi.l...@bluevoyant.com> wrote: > >> Hi, >> We are using flink 1.8.0 (but the flowing also happens in 1.7.2) I tried >> very simple unordered async call >> override def asyncInvoke(input: Foo, resultFuture: >> ResultFuture[ScoredFoo]) : Unit = { >> val r = ScoredFoo(Foo("a"), 80) >> Future.successful(r) >> } >> >> Running this stream seem to be stuck in some infinite loop until it >> crashes on timeout exception.: >> >> *java.lang.Exception: An async function call terminated with an >> exception. Failing the AsyncWaitOperator.* >> * at >> org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:137)* >> * at >> org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:85)* >> * at java.base/java.lang.Thread.run(Thread.java:844)* >> *Caused by: java.util.concurrent.ExecutionException: >> java.util.concurrent.TimeoutException: Async function call has timed out.* >> * at >> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)* >> * at >> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)* >> * at >> org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.get(StreamRecordQueueEntry.java:68)* >> * at >> org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:129)* >> * ... 2 common frames omitted* >> *Caused by: java.util.concurrent.TimeoutException: Async function call >> has timed out.* >> * at >> org.apache.flink.streaming.api.scala.async.AsyncFunction.timeout(AsyncFunction.scala:60)* >> * at >> org.apache.flink.streaming.api.scala.async.AsyncFunction.timeout$(AsyncFunction.scala:59)* >> * at >> com.lookalike.analytic.utils.LookalikeScoreEnrich.timeout(LookalikeScoreEnrich.scala:18)* >> * at >> org.apache.flink.streaming.api.scala.AsyncDataStream$$anon$3.timeout(AsyncDataStream.scala:301)* >> * at >> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$1.onProcessingTime(AsyncWaitOperator.java:211)* >> * at >> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)* >> * at >> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:514)* >> * at >> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)* >> * at >> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:299)* >> * at >> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167)* >> * at >> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)* >> * ... 1 common frames omitted* >> >> Please advise , Thanks >> Avi >> >>