Hi,
According to the suggestion I  override timeout method in the async
function . flink jobs processes real time events for few mins and later
hangs does process at all. Is there any issue with the method below?
I see 0 records per second . can you please help here

@Override
public void timeout(Tuple1<Map<String, List< abd>>> fieldMapTup,
ResultFuture<Tuple3<Map<String, List<abd>>, Map<String, Experiment>, List<
Map<String, Integer>>>> resultFuture) {
//Timed out. Just discard
System.out.println("Timeout:" );

On Wed, Jul 14, 2021 at 9:40 AM Rahul Patwari <rahulpatwari8...@gmail.com>
wrote:

> Hi Ragini,
>
> From the stack trace, the job failed as the Async I/O Operator has timed
> out for an event.
> The timeout is configurable.
>
> Please refer
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/#async-io-api
>
> Quoting from above documentation:
>
>> Timeout: The timeout defines how long an asynchronous request may take
>> before it is considered failed. This parameter guards against dead/failed
>> requests.
>
>
> Regards,
> Rahul
>
> On Wed, Jul 14, 2021 at 9:29 AM Ragini Manjaiah <ragini.manja...@gmail.com>
> wrote:
>
>> Hi ,
>> I am facing the below  issue while processing streaming events. In what
>> scenarios hit with java.lang.Exception: Could not complete the stream
>> element. can please help me here . The job fails after this exception is hit
>>
>>
>> 2021-07-13 13:24:58,781 INFO  
>> org.apache.flink.runtime.executiongraph.ExecutionGraph
>>       [] - ExtractTransformProcess -> AsyncFetch (9/60)
>> (3809c73da162e9ab3b7b131077aff105) switched from RUNNING to FAILED on
>> container_e3767_1626191679189_5049_01      _000003 @ brdn6162.target.com
>> (dataPort=41107).
>>
>>
>> java.lang.Exception: Could not complete the stream element: Record @
>> 1626200691540 : ({<it displays the record>})
>>
>>
>> at
>> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.completeExceptionally(AsyncWaitOperator.java:363)
>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>>
>>         at
>> org.apache.flink.streaming.api.functions.async.AsyncFunction.timeout(AsyncFunction.java:97)
>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>>
>>         at
>> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.lambda$processElement$0(AsyncWaitOperator.java:190)
>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>>
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1223)
>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>>
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$16(StreamTask.java:1214)
>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>>
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>>
>>         at
>> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>>
>>         at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:301)
>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>>
>>         at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183)
>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>>
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569)
>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>>
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>>
>>         at
>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>>
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>>
>>         at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252]
>>
>> Caused by: java.util.concurrent.TimeoutException: Async function call has
>> timed out.
>>
>>
>>
>> Thanks & regards
>>
>

Reply via email to