Hi, According to the suggestion I override timeout method in the async function . flink jobs processes real time events for few mins and later hangs does process at all. Is there any issue with the method below? I see 0 records per second . can you please help here
@Override public void timeout(Tuple1<Map<String, List< abd>>> fieldMapTup, ResultFuture<Tuple3<Map<String, List<abd>>, Map<String, Experiment>, List< Map<String, Integer>>>> resultFuture) { //Timed out. Just discard System.out.println("Timeout:" ); On Wed, Jul 14, 2021 at 9:40 AM Rahul Patwari <rahulpatwari8...@gmail.com> wrote: > Hi Ragini, > > From the stack trace, the job failed as the Async I/O Operator has timed > out for an event. > The timeout is configurable. > > Please refer > https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/#async-io-api > > Quoting from above documentation: > >> Timeout: The timeout defines how long an asynchronous request may take >> before it is considered failed. This parameter guards against dead/failed >> requests. > > > Regards, > Rahul > > On Wed, Jul 14, 2021 at 9:29 AM Ragini Manjaiah <ragini.manja...@gmail.com> > wrote: > >> Hi , >> I am facing the below issue while processing streaming events. In what >> scenarios hit with java.lang.Exception: Could not complete the stream >> element. can please help me here . The job fails after this exception is hit >> >> >> 2021-07-13 13:24:58,781 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph >> [] - ExtractTransformProcess -> AsyncFetch (9/60) >> (3809c73da162e9ab3b7b131077aff105) switched from RUNNING to FAILED on >> container_e3767_1626191679189_5049_01 _000003 @ brdn6162.target.com >> (dataPort=41107). >> >> >> java.lang.Exception: Could not complete the stream element: Record @ >> 1626200691540 : ({<it displays the record>}) >> >> >> at >> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.completeExceptionally(AsyncWaitOperator.java:363) >> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?] >> >> at >> org.apache.flink.streaming.api.functions.async.AsyncFunction.timeout(AsyncFunction.java:97) >> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?] >> >> at >> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.lambda$processElement$0(AsyncWaitOperator.java:190) >> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?] >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1223) >> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?] >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$16(StreamTask.java:1214) >> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?] >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) >> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?] >> >> at >> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) >> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?] >> >> at >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:301) >> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?] >> >> at >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183) >> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?] >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569) >> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?] >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534) >> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?] >> >> at >> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) >> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?] >> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) >> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?] >> >> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252] >> >> Caused by: java.util.concurrent.TimeoutException: Async function call has >> timed out. >> >> >> >> Thanks & regards >> >