[ https://issues.apache.org/jira/browse/FLINK-18983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17327688#comment-17327688 ]
Flink Jira Bot commented on FLINK-18983: ---------------------------------------- This major issue is unassigned and itself and all of its Sub-Tasks have not been updated for 30 days. So, it has been labeled "stale-major". If this ticket is indeed "major", please either assign yourself or give an update. Afterwards, please remove the label. In 7 days the issue will be deprioritized. > Job doesn't changed to failed if close function has blocked > ----------------------------------------------------------- > > Key: FLINK-18983 > URL: https://issues.apache.org/jira/browse/FLINK-18983 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task > Affects Versions: 1.11.0, 1.12.0 > Reporter: YufeiLiu > Priority: Major > Labels: stale-major > > If a operator throw a exception, it will break process loop and dispose all > operator. But state will never switch to FAILED if block in Function.close, > and JobMaster can't know the final state and do restart. > Task have {{TaskCancelerWatchDog}} to kill process if cancellation timeout, > but it doesn't work for FAILED task.TAskThread will allways hang at: > org.apache.flink.streaming.runtime.tasks.StreamTask#cleanUpInvoke > Test case: > {code:java} > Configuration configuration = new Configuration(); > configuration.set(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 10000L); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironment(2, configuration); > env.addSource(...) > .process(new ProcessFunction<String, String>() { > @Override > public void processElement(String value, Context ctx, > Collector<String> out) throws Exception { > if (getRuntimeContext().getIndexOfThisSubtask() == 0) { > throw new RuntimeException(); > } > } > @Override > public void close() throws Exception { > if (getRuntimeContext().getIndexOfThisSubtask() == 0) { > Thread.sleep(10000000); > } > } > }).setParallelism(2) > .print(); > {code} > In this case, job will block at close action and never change to FAILED. > If change thread which subtaskIndex == 1 to sleep, TM will exit after > TASK_CANCELLATION_TIMEOUT. -- This message was sent by Atlassian Jira (v8.3.4#803005)