[ 
https://issues.apache.org/jira/browse/FLINK-11909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16831421#comment-16831421
 ] 

Rong Rong edited comment on FLINK-11909 at 5/2/19 5:39 AM:
-----------------------------------------------------------

I took some time to look at the {{AsyncFunction}} API and the 
{{AsyncWaitOperator}}. My understanding is that:
when user implements an AsyncFunction, it has full control on how the 
asyncInvoke being executed (including creation of the ExecutorService used, # 
of threads). Thus user can also include any of the retry policies if needed (by 
directly create a retry mechanism)

The limitation of this is: 
1. Users does not have access to timer service from AsyncFunction - On the 
contrary, {{AsyncWaitOperator}} have access to register / cleanup / invoke 
{{AsyncFunction#timeout}} API which makes it more flexible in terms of creating 
retry policies.
2. Retry within {{AsyncFunction#asyncInvoke}} would mean the retry is 
input-element independent - this does not fit the "rate limiting" requirement 
from the mailing list - I am assuming this is more like a retry queue for a 
specific outbound service, not limited to a specific input element.
3. Retry within {{AsyncFunction}} cannot allow access to 
{{StreamElementQueue}}, which means it cannot alter the ordering of the 
elements being executed. This doesn't fit into the description in Shuyi's 
comment of a DLQ (where failed component are put in the back of the queue 
before it can be retried the 2nd time)

I have a general idea to extend the {{RuntimeContext}} to create a 
{{RetryContext}} to solve these issues. But I want to also make sure that the 
above concerns I mentioned are valid and should be addressed by Flink. 
otherwise, I think the current AsyncFunction API is good to go for some basic 
retry policies.

Any thoughts? [~till.rohrmann] [~suez1224] 


was (Author: walterddr):
I took some time to look at the {{AsyncFunction}} API and the 
{{AsyncWaitOperator}}. My understanding is that:
when user implements an AsyncFunction, it has full control on how the 
asyncInvoke being executed (including creation of the ExecutorService used, # 
of threads). Thus user can also include any of the retry policies if needed (by 
directly create a retry mechanism)

The limitation of this is: 
1. Users does not have access to timer service from AsyncFunction. However, 
AsyncWaitOperator have access to register / cleanup / invoke 
{{AsyncFunction#timeout}} API 
2. Retry within {{AsyncFunction#asyncInvoke}} would mean the retry is 
input-element independent. this does not fit the "rate limiting" requirement 
from the mailing list - I am assuming this is more like a retry queue for a 
specific outbound service, not limited to a specific input element.
3. Retry within {{AsyncFunction}} cannot allow access to 
{{StreamElementQueue}}, which means it cannot alter the ordering of the 
elements being executed. This doesn't fit into the description in Shuyi's 
comment of a DLQ (where failed component are put in the back of the queue 
before it can be retried the 2nd time)

I have a general idea to extend the {{RuntimeContext}} to create a 
{{RetryContext}} to solve these issues. But I want to also make sure that the 
above concerns I mentioned are valid and should be addressed by Flink. 
otherwise, I think the current AsyncFunction API is good to go for some basic 
retry policies.

Any thoughts? [~till.rohrmann] [~suez1224] 

> Provide default failure/timeout/backoff handling strategy for AsyncIO 
> functions
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-11909
>                 URL: https://issues.apache.org/jira/browse/FLINK-11909
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / DataStream
>            Reporter: Rong Rong
>            Assignee: Rong Rong
>            Priority: Major
>
> Currently Flink AsyncIO by default fails the entire job when async function 
> invoke fails [1]. It would be nice to have some default Async IO 
> failure/timeout handling strategy, or opens up some APIs for AsyncFunction 
> timeout method to interact with the AsyncWaitOperator. For example (quote 
> [~suez1224] in [2]):
> * FAIL_OPERATOR (default & current behavior)
> * FIX_INTERVAL_RETRY (retry with configurable fixed interval up to N times)
> * EXP_BACKOFF_RETRY (retry with exponential backoff up to N times)
> Discussion also extended to introduce configuration such as: 
> * MAX_RETRY_COUNT
> * RETRY_FAILURE_POLICY
> REF:
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html#timeout-handling
> [2] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Backoff-strategies-for-async-IO-functions-tt26580.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to