Hi Ragini, AsyncDataStream.unorderedWait() or AsyncDataStream.orderedWait() takes a timeout as a parameter and the TimeUnit for the timeout specified as another parameter.
The timeout() method above is called when an Async I/O operation is timed out. The default operation is to raise an exception when an operation times out. It seems like you have overridden the timeout method to discard the record for which the operation timed out. If records are not getting processed after some time, probably all operations are getting timed out. I would suggest to look at the timeout configuration provided in AsyncDataStream.unorderedWait() or AsyncDataStream.orderedWait() and identify why there are timeouts rather than overriding the timeout method. Regards, Rahul On Wed, Jul 14, 2021 at 9:19 PM Ragini Manjaiah <ragini.manja...@gmail.com> wrote: > Hi, > According to the suggestion I override timeout method in the async > function . flink jobs processes real time events for few mins and later > hangs does process at all. Is there any issue with the method below? > I see 0 records per second . can you please help here > > @Override > public void timeout(Tuple1<Map<String, List< abd>>> fieldMapTup, > ResultFuture<Tuple3<Map<String, List<abd>>, Map<String, Experiment>, List< > Map<String, Integer>>>> resultFuture) { > //Timed out. Just discard > System.out.println("Timeout:" ); > > On Wed, Jul 14, 2021 at 9:40 AM Rahul Patwari <rahulpatwari8...@gmail.com> > wrote: > >> Hi Ragini, >> >> From the stack trace, the job failed as the Async I/O Operator has timed >> out for an event. >> The timeout is configurable. >> >> Please refer >> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/#async-io-api >> >> Quoting from above documentation: >> >>> Timeout: The timeout defines how long an asynchronous request may take >>> before it is considered failed. This parameter guards against dead/failed >>> requests. >> >> >> Regards, >> Rahul >> >> On Wed, Jul 14, 2021 at 9:29 AM Ragini Manjaiah < >> ragini.manja...@gmail.com> wrote: >> >>> Hi , >>> I am facing the below issue while processing streaming events. In what >>> scenarios hit with java.lang.Exception: Could not complete the stream >>> element. can please help me here . The job fails after this exception is hit >>> >>> >>> 2021-07-13 13:24:58,781 INFO >>> org.apache.flink.runtime.executiongraph.ExecutionGraph >>> [] - ExtractTransformProcess -> AsyncFetch (9/60) >>> (3809c73da162e9ab3b7b131077aff105) switched from RUNNING to FAILED on >>> container_e3767_1626191679189_5049_01 _000003 @ brdn6162.target.com >>> (dataPort=41107). >>> >>> >>> java.lang.Exception: Could not complete the stream element: Record @ >>> 1626200691540 : ({<it displays the record>}) >>> >>> >>> at >>> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.completeExceptionally(AsyncWaitOperator.java:363) >>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?] >>> >>> at >>> org.apache.flink.streaming.api.functions.async.AsyncFunction.timeout(AsyncFunction.java:97) >>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?] >>> >>> at >>> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.lambda$processElement$0(AsyncWaitOperator.java:190) >>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?] >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1223) >>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?] >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$16(StreamTask.java:1214) >>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?] >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) >>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?] >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) >>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?] >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:301) >>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?] >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183) >>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?] >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569) >>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?] >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534) >>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?] >>> >>> at >>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) >>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?] >>> >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) >>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?] >>> >>> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252] >>> >>> Caused by: java.util.concurrent.TimeoutException: Async function call >>> has timed out. >>> >>> >>> >>> Thanks & regards >>> >>