[ https://issues.apache.org/jira/browse/FLINK-11909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16831421#comment-16831421 ]
Rong Rong edited comment on FLINK-11909 at 5/2/19 5:39 AM: ----------------------------------------------------------- I took some time to look at the {{AsyncFunction}} API and the {{AsyncWaitOperator}}. My understanding is that: when user implements an AsyncFunction, it has full control on how the asyncInvoke being executed (including creation of the ExecutorService used, # of threads). Thus user can also include any of the retry policies if needed (by directly create a retry mechanism) The limitation of this is: 1. Users does not have access to timer service from AsyncFunction - On the contrary, {{AsyncWaitOperator}} have access to register / cleanup / invoke {{AsyncFunction#timeout}} API which makes it more flexible in terms of creating retry policies. 2. Retry within {{AsyncFunction#asyncInvoke}} would mean the retry is input-element independent - this does not fit the "rate limiting" requirement from the mailing list - I am assuming this is more like a retry queue for a specific outbound service, not limited to a specific input element. 3. Retry within {{AsyncFunction}} cannot allow access to {{StreamElementQueue}}, which means it cannot alter the ordering of the elements being executed. This doesn't fit into the description in Shuyi's comment of a DLQ (where failed component are put in the back of the queue before it can be retried the 2nd time) I have a general idea to extend the {{RuntimeContext}} to create a {{RetryContext}} to solve these issues. But I want to also make sure that the above concerns I mentioned are valid and should be addressed by Flink. otherwise, I think the current AsyncFunction API is good to go for some basic retry policies. Any thoughts? [~till.rohrmann] [~suez1224] was (Author: walterddr): I took some time to look at the {{AsyncFunction}} API and the {{AsyncWaitOperator}}. My understanding is that: when user implements an AsyncFunction, it has full control on how the asyncInvoke being executed (including creation of the ExecutorService used, # of threads). Thus user can also include any of the retry policies if needed (by directly create a retry mechanism) The limitation of this is: 1. Users does not have access to timer service from AsyncFunction. However, AsyncWaitOperator have access to register / cleanup / invoke {{AsyncFunction#timeout}} API 2. Retry within {{AsyncFunction#asyncInvoke}} would mean the retry is input-element independent. this does not fit the "rate limiting" requirement from the mailing list - I am assuming this is more like a retry queue for a specific outbound service, not limited to a specific input element. 3. Retry within {{AsyncFunction}} cannot allow access to {{StreamElementQueue}}, which means it cannot alter the ordering of the elements being executed. This doesn't fit into the description in Shuyi's comment of a DLQ (where failed component are put in the back of the queue before it can be retried the 2nd time) I have a general idea to extend the {{RuntimeContext}} to create a {{RetryContext}} to solve these issues. But I want to also make sure that the above concerns I mentioned are valid and should be addressed by Flink. otherwise, I think the current AsyncFunction API is good to go for some basic retry policies. Any thoughts? [~till.rohrmann] [~suez1224] > Provide default failure/timeout/backoff handling strategy for AsyncIO > functions > ------------------------------------------------------------------------------- > > Key: FLINK-11909 > URL: https://issues.apache.org/jira/browse/FLINK-11909 > Project: Flink > Issue Type: Improvement > Components: API / DataStream > Reporter: Rong Rong > Assignee: Rong Rong > Priority: Major > > Currently Flink AsyncIO by default fails the entire job when async function > invoke fails [1]. It would be nice to have some default Async IO > failure/timeout handling strategy, or opens up some APIs for AsyncFunction > timeout method to interact with the AsyncWaitOperator. For example (quote > [~suez1224] in [2]): > * FAIL_OPERATOR (default & current behavior) > * FIX_INTERVAL_RETRY (retry with configurable fixed interval up to N times) > * EXP_BACKOFF_RETRY (retry with exponential backoff up to N times) > Discussion also extended to introduce configuration such as: > * MAX_RETRY_COUNT > * RETRY_FAILURE_POLICY > REF: > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html#timeout-handling > [2] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Backoff-strategies-for-async-IO-functions-tt26580.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)