akalash commented on a change in pull request #16885: URL: https://github.com/apache/flink/pull/16885#discussion_r692114692
########## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java ########## @@ -116,7 +116,7 @@ protected void processInput(MailboxDefaultAction.Controller controller) throws E protected void cancelTask() {} @Override - protected void cleanup() throws Exception { + protected void cleanUpInternal() throws Exception { Review comment: There are too many methods with cleanUp in the name, maybe we can rename it to something different like closeResources(analog of closeAllOperators) or something like that? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ########## @@ -762,17 +762,21 @@ private void doRun() { executingThread.setContextClassLoader(userCodeClassLoader.asClassLoader()); AbstractInvokable finalInvokable = invokable; - runWithSystemExitMonitoring(finalInvokable::restore); + try { + runWithSystemExitMonitoring(finalInvokable::restore); - if (!transitionState(ExecutionState.INITIALIZING, ExecutionState.RUNNING)) { - throw new CancelTaskException(); - } + if (!transitionState(ExecutionState.INITIALIZING, ExecutionState.RUNNING)) { + throw new CancelTaskException(); + } - // notify everyone that we switched to running - taskManagerActions.updateTaskExecutionState( - new TaskExecutionState(executionId, ExecutionState.RUNNING)); + // notify everyone that we switched to running + taskManagerActions.updateTaskExecutionState( + new TaskExecutionState(executionId, ExecutionState.RUNNING)); - runWithSystemExitMonitoring(finalInvokable::invoke); + runWithSystemExitMonitoring(finalInvokable::invoke); + } finally { + runWithSystemExitMonitoring(finalInvokable::cleanUp); + } Review comment: I just want to share my thoughts about these methods a little. More precisely, we have now three tightly connected methods(restore, invoke, cleanUp) but we don't have a good contract for them(just some explanation in java-doc). I noticed this problem last time when I implemented 'restore'. My proposal was to have the extra interface with only these two(now three) methods: ``` InvokableExutor executor = invokable.executor(); try{ executor.restore(); executor.invoke(); } finally { executor.cleanUp(); } ``` It is still not the best contract but at least it segregates these dependent methods from the other methods in `AbstractInvokable`. Unfortunately, it is not so easy to implement this idea for the current implementation, so we decided that it didn't make sense. But since now we have the third dependent method, I want to get back to the discussion about this problem. I still think that my idea described above doesn't make sense for the current PR(it is too invasive and it doesn't have clear profit). But anyway I want to ask your opinion about that. Do you think it is the real problem(having three dependent methods along with other random methods)? If so, do you have any idea how we can improve it? ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ########## @@ -726,8 +720,6 @@ private void ensureNotCanceled() { @Override public final void invoke() throws Exception { runWithCleanUpOnFail(this::executeInvoke); Review comment: I think the name `runWithCleanUpOnFail` is not correct anymore(we don't do any cleanUp there) so maybe we also rename it to something like `runWithCancelOnFail`(there is no big difference but I think it will be less confusing). ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ########## @@ -793,8 +785,6 @@ private void runWithCleanUpOnFail(RunnableWithException run) throws Exception { invokeException = firstOrSuppressed(ex, invokeException); } } - - cleanUpInvoke(); } // TODO: investigate why Throwable instead of Exception is used here. catch (Throwable cleanUpException) { Review comment: Please, take a look at this method more carefully I believe it can be a little rewritten now. At least this `catch` block doesn't make sense since you removed `cleanUpInvoke` so the new exception is impossible after the previous `catch`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org