Hi Ragini,

AsyncDataStream.unorderedWait() or AsyncDataStream.orderedWait() takes a
timeout as a parameter and the TimeUnit for the timeout specified as
another parameter.

The timeout() method above is called when an Async I/O operation is timed
out. The default operation is to raise an exception when an operation times
out. It seems like you have overridden the timeout method to discard the
record for which the operation timed out.

If records are not getting processed after some time, probably all
operations are getting timed out.

I would suggest to look at the timeout configuration provided in
AsyncDataStream.unorderedWait() or AsyncDataStream.orderedWait() and
identify why there are timeouts rather than overriding the timeout method.

Regards,
Rahul

On Wed, Jul 14, 2021 at 9:19 PM Ragini Manjaiah <ragini.manja...@gmail.com>
wrote:

> Hi,
> According to the suggestion I  override timeout method in the async
> function . flink jobs processes real time events for few mins and later
> hangs does process at all. Is there any issue with the method below?
> I see 0 records per second . can you please help here
>
> @Override
> public void timeout(Tuple1<Map<String, List< abd>>> fieldMapTup,
> ResultFuture<Tuple3<Map<String, List<abd>>, Map<String, Experiment>, List<
> Map<String, Integer>>>> resultFuture) {
> //Timed out. Just discard
> System.out.println("Timeout:" );
>
> On Wed, Jul 14, 2021 at 9:40 AM Rahul Patwari <rahulpatwari8...@gmail.com>
> wrote:
>
>> Hi Ragini,
>>
>> From the stack trace, the job failed as the Async I/O Operator has timed
>> out for an event.
>> The timeout is configurable.
>>
>> Please refer
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/#async-io-api
>>
>> Quoting from above documentation:
>>
>>> Timeout: The timeout defines how long an asynchronous request may take
>>> before it is considered failed. This parameter guards against dead/failed
>>> requests.
>>
>>
>> Regards,
>> Rahul
>>
>> On Wed, Jul 14, 2021 at 9:29 AM Ragini Manjaiah <
>> ragini.manja...@gmail.com> wrote:
>>
>>> Hi ,
>>> I am facing the below  issue while processing streaming events. In what
>>> scenarios hit with java.lang.Exception: Could not complete the stream
>>> element. can please help me here . The job fails after this exception is hit
>>>
>>>
>>> 2021-07-13 13:24:58,781 INFO  
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph
>>>       [] - ExtractTransformProcess -> AsyncFetch (9/60)
>>> (3809c73da162e9ab3b7b131077aff105) switched from RUNNING to FAILED on
>>> container_e3767_1626191679189_5049_01      _000003 @ brdn6162.target.com
>>> (dataPort=41107).
>>>
>>>
>>> java.lang.Exception: Could not complete the stream element: Record @
>>> 1626200691540 : ({<it displays the record>})
>>>
>>>
>>> at
>>> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.completeExceptionally(AsyncWaitOperator.java:363)
>>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>>>
>>>         at
>>> org.apache.flink.streaming.api.functions.async.AsyncFunction.timeout(AsyncFunction.java:97)
>>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>>>
>>>         at
>>> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.lambda$processElement$0(AsyncWaitOperator.java:190)
>>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1223)
>>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$16(StreamTask.java:1214)
>>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
>>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:301)
>>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183)
>>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569)
>>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
>>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>>>
>>>         at
>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>>>
>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>>>
>>>         at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252]
>>>
>>> Caused by: java.util.concurrent.TimeoutException: Async function call
>>> has timed out.
>>>
>>>
>>>
>>> Thanks & regards
>>>
>>

Reply via email to