zentol commented on code in PR #19993: URL: https://github.com/apache/flink/pull/19993#discussion_r899866865
########## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java: ########## @@ -98,6 +98,9 @@ static ChannelStateWriteRequest buildFutureWriteRequest( List<Buffer> buffers; try { buffers = dataFuture.get(); + } catch (InterruptedException e) { + writer.fail(e); + throw e; Review Comment: does this maybe belong rather to FLINK-27792? AFAICT this isn't required for the issue at hand. ########## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java: ########## @@ -109,6 +112,9 @@ static ChannelStateWriteRequest buildFutureWriteRequest( } }, throwable -> { + if (!dataFuture.isDone()) { + return; + } Review Comment: While this will likely solve the issue I'm not sure if it is the correct solution. We could see in the logs that this future would eventually be completed, with several buffers being contained within. Admittedly this happened after the hosting TM shut down (so I'm not sure if it can happen in production where the JVM would go with it), but I do wonder if this couldn't cause a buffer leak. Would there be any down-side of doing the clean-up like this: ``` dataFuture.thenAccept( buffers -> { try { CloseableIterator.fromList(buffers, Buffer::recycleBuffer) .close(); } catch (Exception e) { } }); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org