[ https://issues.apache.org/jira/browse/FLINK-18983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17251540#comment-17251540 ]
Yuan Mei edited comment on FLINK-18983 at 12/18/20, 6:40 AM: ------------------------------------------------------------- Hey, [~liuyufei]. You are absolutely right: Having failed tasks and task cancelation with inconsistent behavior is misleading. And a valid argument (as you mentioned in the description) is to have a similar watchdog to timeout the process of task failure as well. However, after looking through the code a bit, we think the change is more than just simply adding a watchdog when handing invokeException in `StreamTask#invoke`. Instead, we probably need to design a way to properly unify * `Task#cancelOrFailAndCancelInvokableInternal` where cancelation is issued outside from user invokable, and * invokeException handing in `StreamTask#invoke` (user invokable) and as you can see, the change itself looks non-trivial. And In the future, we do have plans to refactor `Task` and `StreamTask`, so we do want to avoid hacky fixes right now. So, based on your last comment, do you think my previous suggestion to "add timeout logic or bounded retry numbers in the close function" can solve your problem in short term? Please let me know! Thanks! was (Author: ym): Hey, [~liuyufei]. You are absolutely right: Having failed tasks and task cancelation with inconsistent behavior is misleading. And a valid argument (as you mentioned in the description) is to have a similar watchdog to timeout the process of task failure as well. However, after looking through the code a bit, we think the change is more than just simply adding a watchdog when handing invokeException in `StreamTask#invoke`. Instead, we probably need to design a way to properly unify * `Task#cancelOrFailAndCancelInvokableInternal` where cancelation is issued outside from user invokable, and * invokeException handing in `StreamTask#invoke` (user invokable) and as you can see, the change itself looks non-trivial. And In the future, we do have plans to refactor `Task` and `StreamTask`, so we do want to avoid hacky fixes right now. So, based on your last comment, do you think my previous suggestion to "add timeout logic or bounded retry numbers in the close function" can solve your problem in short term? Please let me know! Thanks! > Job doesn't changed to failed if close function has blocked > ----------------------------------------------------------- > > Key: FLINK-18983 > URL: https://issues.apache.org/jira/browse/FLINK-18983 > Project: Flink > Issue Type: Bug > Components: Runtime / Task > Affects Versions: 1.11.0, 1.12.0 > Reporter: YufeiLiu > Priority: Major > > If a operator throw a exception, it will break process loop and dispose all > operator. But state will never switch to FAILED if block in Function.close, > and JobMaster can't know the final state and do restart. > Task have {{TaskCancelerWatchDog}} to kill process if cancellation timeout, > but it doesn't work for FAILED task.TAskThread will allways hang at: > org.apache.flink.streaming.runtime.tasks.StreamTask#cleanUpInvoke > Test case: > {code:java} > Configuration configuration = new Configuration(); > configuration.set(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 10000L); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironment(2, configuration); > env.addSource(...) > .process(new ProcessFunction<String, String>() { > @Override > public void processElement(String value, Context ctx, > Collector<String> out) throws Exception { > if (getRuntimeContext().getIndexOfThisSubtask() == 0) { > throw new RuntimeException(); > } > } > @Override > public void close() throws Exception { > if (getRuntimeContext().getIndexOfThisSubtask() == 0) { > Thread.sleep(10000000); > } > } > }).setParallelism(2) > .print(); > {code} > In this case, job will block at close action and never change to FAILED. > If change thread which subtaskIndex == 1 to sleep, TM will exit after > TASK_CANCELLATION_TIMEOUT. -- This message was sent by Atlassian Jira (v8.3.4#803005)