gaoyunhaii commented on a change in pull request #16351: URL: https://github.com/apache/flink/pull/16351#discussion_r663682997
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ########## @@ -853,24 +852,25 @@ private Exception runAndSuppressThrowable( } /** - * Execute @link StreamOperator#dispose()} of each operator in the chain of this {@link - * StreamTask}. Disposing happens from <b>tail to head</b> operator in the chain. + * Execute {@link StreamOperator#close()} of each operator in the chain of this {@link + * StreamTask}. Closing happens from <b>tail to head</b> operator in the chain. */ - private void disposeAllOperators() throws Exception { + private void closeAllOperators() throws Exception { if (operatorChain != null && !disposedOperators) { Review comment: Also rename the `disposedOperators` to `closedOperators` ? ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ########## @@ -322,30 +322,11 @@ protected boolean isUsingCustomRawKeyedState() { @Override public void open() throws Exception {} - /** - * This method is called after all records have been added to the operators via the methods - * {@link OneInputStreamOperator#processElement(StreamRecord)}, or {@link - * TwoInputStreamOperator#processElement1(StreamRecord)} and {@link - * TwoInputStreamOperator#processElement2(StreamRecord)}. - * - * <p>The method is expected to flush all remaining buffered data. Exceptions during this - * flushing of buffered should be propagated, in order to cause the operation to be recognized - * asa failed, because the last data items are not processed properly. - * - * @throws Exception An exception in this method causes the operator to fail. - */ @Override - public void close() throws Exception {} + public void finish() throws Exception {} - /** - * This method is called at the very end of the operator's life, both in the case of a - * successful completion of the operation, and in the case of a failure and canceling. - * - * <p>This method is expected to make a thorough effort to release all resources that the - * operator has acquired. - */ @Override - public void dispose() throws Exception { + public void close() throws Exception { Review comment: Although it is definitely not introduced in this PR, I have a bit concern here in that we rely on the subclass to call `super.close()`, otherwise the `statehandler` won't get cleaned up, which might further cause resource leak (like the memory occupied by rocksdb). Currently it seems we have operators like `GenericWriteAheadSink` , `TemporalProcessTimeJoinOperator`, `TemporalProcessTimeJoinOperator` and `WatermarkAssignerOperator` that indeed do not call `super.close()`. Perhaps we could introduce a final method `closeAndCleanupState` that get called by the framework, and call `close()` in that method~? ########## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java ########## @@ -105,6 +105,7 @@ protected void processInput(MailboxDefaultAction.Controller controller) throws E mainOperator.setKeyContextElement1(streamRecord); mainOperator.processElement(streamRecord); } else { + mainOperator.finish(); Review comment: Might this be called after `endInput` to be consistent with other `StreamTask`? ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ########## @@ -853,24 +852,25 @@ private Exception runAndSuppressThrowable( } /** - * Execute @link StreamOperator#dispose()} of each operator in the chain of this {@link - * StreamTask}. Disposing happens from <b>tail to head</b> operator in the chain. + * Execute {@link StreamOperator#close()} of each operator in the chain of this {@link + * StreamTask}. Closing happens from <b>tail to head</b> operator in the chain. */ - private void disposeAllOperators() throws Exception { + private void closeAllOperators() throws Exception { if (operatorChain != null && !disposedOperators) { - Exception disposalException = null; + Exception closingException = null; for (StreamOperatorWrapper<?, ?> operatorWrapper : operatorChain.getAllOperators(true)) { StreamOperator<?> operator = operatorWrapper.getStreamOperator(); try { - operator.dispose(); + operator.close(); + // operator.dispose(); Review comment: This line seems could be removed ? ########## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.java ########## @@ -148,11 +148,11 @@ public void close() throws Exception { * StreamOperator#open()} which happens <b>tail to head</b>. */ @Override - public void dispose() throws Exception { - super.dispose(); + public void finish() throws Exception { Review comment: We should also need to modify the comment of this method. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java ########## @@ -266,30 +266,11 @@ protected boolean isUsingCustomRawKeyedState() { @Override public void open() throws Exception {} - /** - * This method is called after all records have been added to the operators via the methods - * {@link OneInputStreamOperator#processElement(StreamRecord)}, or {@link - * TwoInputStreamOperator#processElement1(StreamRecord)} and {@link - * TwoInputStreamOperator#processElement2(StreamRecord)}. - * - * <p>The method is expected to flush all remaining buffered data. Exceptions during this - * flushing of buffered should be propagated, in order to cause the operation to be recognized - * asa failed, because the last data items are not processed properly. - * - * @throws Exception An exception in this method causes the operator to fail. - */ @Override - public void close() throws Exception {} + public void finish() throws Exception {} - /** - * This method is called at the very end of the operator's life, both in the case of a - * successful completion of the operation, and in the case of a failure and canceling. - * - * <p>This method is expected to make a thorough effort to release all resources that the - * operator has acquired. - */ @Override - public void dispose() throws Exception { + public void close() throws Exception { Review comment: And here there might be a similar concern as the `AbstractStreamOperator`. ########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ########## @@ -388,7 +388,7 @@ public void processElement(StreamRecord<String> element) throws Exception { } @Override - public void dispose() throws Exception { + public void close() throws Exception { throw new DisposeException(); Review comment: Might we also change this exception to `CloseException` ? ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ########## @@ -853,24 +852,25 @@ private Exception runAndSuppressThrowable( } /** - * Execute @link StreamOperator#dispose()} of each operator in the chain of this {@link - * StreamTask}. Disposing happens from <b>tail to head</b> operator in the chain. + * Execute {@link StreamOperator#close()} of each operator in the chain of this {@link Review comment: There is also an outdated log in line 504 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org