Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/4639#discussion_r140455425 --- Diff: flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java --- @@ -61,13 +75,14 @@ public final void registerClosable(C closeable) throws IOException { } synchronized (getSynchronizationLock()) { - if (closed) { - IOUtils.closeQuietly(closeable); - throw new IOException("Cannot register Closeable, registry is already closed. Closing argument."); + if (!closed) { + doRegister(closeable, closeableToRef); + return; } - - doRegister(closeable, closeableToRef); } + + IOUtils.closeQuietly(closeable); --- End diff -- And for the second part, I think it should be closed automatically, because you the caller decides to give the responsibility to the registry and tie it to the registry's status. So the registry should take care that the close status is propagated to new incoming objects.
---