[ 
https://issues.apache.org/jira/browse/FLINK-26424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17499964#comment-17499964
 ] 

Till Rohrmann commented on FLINK-26424:
---------------------------------------

Hi [~ychensha], I assume you are using StateFun's asynchronous 
{{RequestReplyClient}}, right?

The request will fail with the {{RequestTimeoutException}} if it could not be 
completed within the call timeout. During this time, the client will retry the 
request with an exponential backoff strategy. Hence, I would recommend trying 
to increase {{spec.transports.timeout.call}} to something higher.

> RequestTimeoutException 
> ------------------------
>
>                 Key: FLINK-26424
>                 URL: https://issues.apache.org/jira/browse/FLINK-26424
>             Project: Flink
>          Issue Type: Bug
>          Components: Stateful Functions
>    Affects Versions: statefun-3.2.0
>            Reporter: xinchenyuan
>            Priority: Major
>
> there is no max retries, all I got is the call timeout
> as doc said, [Transport Spec  
> |https://nightlies.apache.org/flink/flink-statefun-docs-release-3.2/docs/modules/http-endpoint/#transport-1]call
>  will be failed after timeout.....
> but when expcetion raised, runtime restart,  I'm confused why a function 
> internal error will cause such a big problem, will MAX RETRIES be a 
> configurable param?
>  
> 2022-02-28 17:58:32
> org.apache.flink.statefun.flink.core.functions.StatefulFunctionInvocationException:
>  An error occurred when attempting to invoke function FunctionType(tendoc, 
> AlertNotificationIngressCkafka).
>     at 
> org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:50)
>     at 
> org.apache.flink.statefun.flink.core.functions.ReusableContext.apply(ReusableContext.java:73)
>     at 
> org.apache.flink.statefun.flink.core.functions.FunctionActivation.applyNextPendingEnvelope(FunctionActivation.java:50)
>     at 
> org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.processNextEnvelope(LocalFunctionGroup.java:61)
>     at 
> org.apache.flink.statefun.flink.core.functions.Reductions.processEnvelopes(Reductions.java:164)
>     at 
> org.apache.flink.statefun.flink.core.functions.AsyncSink.drainOnOperatorThread(AsyncSink.java:119)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.yield(MailboxExecutorImpl.java:86)
>     at 
> org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:88)
>     at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:108)
>     at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:89)
>     at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
>     at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>     at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>     at 
> org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.sendDownstream(FeedbackUnionOperator.java:186)
>     at 
> org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.processElement(FeedbackUnionOperator.java:86)
>     at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>     at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: java.lang.IllegalStateException: Failure forwarding a message to a 
> remote function Address(tendoc, AlertNotificationIngressCkafka, cls-message)
>     at 
> org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.onAsyncResult(RequestReplyFunction.java:170)
>     at 
> org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.invoke(RequestReplyFunction.java:124)
>     at 
> org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:48)
>     ... 29 more
> Caused by: 
> org.apache.flink.statefun.flink.core.nettyclient.exceptions.RequestTimeoutException



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to