[ 
https://issues.apache.org/jira/browse/FLINK-26424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17500061#comment-17500061
 ] 

xinchenyuan commented on FLINK-26424:
-------------------------------------

Not hotspot issue, I've already learnt the logic function's id things...

My point is some messages function can not be handled forever, and the runtime 
should take over, e.g. discard the message after the timeout.call instend of 
crash and loop infinitely... The runtime has already impl the backoff stragety, 
so  

For now, I set a redeliver count in the state and make runtime delivery 
successful after a max redeliver count as a workaround, but I should impl the 
backoff stragety myself using _*{{SendAfter}}*_

Besides, crash and loop infinitely increases the runtime operation complexity, 
one functions failed the entire runtime pays....

> RequestTimeoutException 
> ------------------------
>
>                 Key: FLINK-26424
>                 URL: https://issues.apache.org/jira/browse/FLINK-26424
>             Project: Flink
>          Issue Type: Bug
>          Components: Stateful Functions
>    Affects Versions: statefun-3.2.0
>            Reporter: xinchenyuan
>            Priority: Major
>
> there is no max retries, all I got is the call timeout
> as doc said, [Transport Spec  
> |https://nightlies.apache.org/flink/flink-statefun-docs-release-3.2/docs/modules/http-endpoint/#transport-1]call
>  will be failed after timeout.....
> but when expcetion raised, runtime restart,  I'm confused why a function 
> internal error will cause such a big problem, will MAX RETRIES be a 
> configurable param?
>  
> 2022-02-28 17:58:32
> org.apache.flink.statefun.flink.core.functions.StatefulFunctionInvocationException:
>  An error occurred when attempting to invoke function FunctionType(tendoc, 
> AlertNotificationIngressCkafka).
>     at 
> org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:50)
>     at 
> org.apache.flink.statefun.flink.core.functions.ReusableContext.apply(ReusableContext.java:73)
>     at 
> org.apache.flink.statefun.flink.core.functions.FunctionActivation.applyNextPendingEnvelope(FunctionActivation.java:50)
>     at 
> org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.processNextEnvelope(LocalFunctionGroup.java:61)
>     at 
> org.apache.flink.statefun.flink.core.functions.Reductions.processEnvelopes(Reductions.java:164)
>     at 
> org.apache.flink.statefun.flink.core.functions.AsyncSink.drainOnOperatorThread(AsyncSink.java:119)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.yield(MailboxExecutorImpl.java:86)
>     at 
> org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:88)
>     at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:108)
>     at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:89)
>     at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
>     at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>     at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>     at 
> org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.sendDownstream(FeedbackUnionOperator.java:186)
>     at 
> org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.processElement(FeedbackUnionOperator.java:86)
>     at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>     at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: java.lang.IllegalStateException: Failure forwarding a message to a 
> remote function Address(tendoc, AlertNotificationIngressCkafka, cls-message)
>     at 
> org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.onAsyncResult(RequestReplyFunction.java:170)
>     at 
> org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.invoke(RequestReplyFunction.java:124)
>     at 
> org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:48)
>     ... 29 more
> Caused by: 
> org.apache.flink.statefun.flink.core.nettyclient.exceptions.RequestTimeoutException



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to