fredia commented on PR #24653: URL: https://github.com/apache/flink/pull/24653#issuecomment-2100331363
When passing `CheckpointStorageWorkerView` directly to `ChannelStateWriteRequestDispatcher`, `checkpointStorage` `UnalignedCheckpointRescaleITCase` might throw exceptions when the task exited. This is mainly because the task will close the stream held by the current thread when it exits, see `FileSystemSafetyNet#closeSafetyNetAndGuardedResourcesForThread`(Thanks for @1996fanrui's help). Therefore, I let `ChannelStateWriteRequestDispatcher#CheckpointStorageWorkerView` be lazily initialized. and changed the `filesystem` in `FsMergingCheckpointStorageAccess` to unwrapped `filesystem`, to prevent different threads from interfering with each other when exiting. ``` Caused by: java.io.IOException: Could not open output stream for state backend at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:461) ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flushToFile(FsCheckpointStreamFactory.java:308) ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:284) ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at java.io.DataOutputStream.write(DataOutputStream.java:107) ~[?:1.8.0_292] at java.io.FilterOutputStream.write(FilterOutputStream.java:97) ~[?:1.8.0_292] at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.getBytes(NetworkBuffer.java:404) ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at org.apache.flink.runtime.checkpoint.channel.ChannelStateSerializerImpl.writeData(ChannelStateSerializer.java:164) ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.lambda$write$2(ChannelStateCheckpointWriter.java:194) ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.runWithChecks(ChannelStateCheckpointWriter.java:274) ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.write(ChannelStateCheckpointWriter.java:189) ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.writeInput(ChannelStateCheckpointWriter.java:153) ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.lambda$write$2(ChannelStateWriteRequest.java:139) ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.lambda$buildWriteRequest$8(ChannelStateWriteRequest.java:231) ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequest.execute(ChannelStateWriteRequest.java:366) ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.handleCheckpointInProgressRequest(ChannelStateWriteRequestDispatcherImpl.java:169) ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatchInternal(ChannelStateWriteRequestDispatcherImpl.java:119) ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatch(ChannelStateWriteRequestDispatcherImpl.java:86) ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.loop(ChannelStateWriteRequestExecutorImpl.java:182) ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.run(ChannelStateWriteRequestExecutorImpl.java:136) ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] ... 1 more Caused by: java.io.IOException: Cannot register Closeable, registry is already closed. Closing argument. at org.apache.flink.util.AbstractAutoCloseableRegistry.registerCloseable(AbstractAutoCloseableRegistry.java:89) ~[flink-core-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at org.apache.flink.core.fs.ClosingFSDataOutputStream.wrapSafe(ClosingFSDataOutputStream.java:103) ~[flink-core-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:131) ~[flink-core-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at org.apache.flink.core.fs.EntropyInjector.createEntropyAware(EntropyInjector.java:76) ~[flink-core-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:451) ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flushToFile(FsCheckpointStreamFactory.java:308) ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:284) ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at java.io.DataOutputStream.write(DataOutputStream.java:107) ~[?:1.8.0_292] at java.io.FilterOutputStream.write(FilterOutputStream.java:97) ~[?:1.8.0_292] at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.getBytes(NetworkBuffer.java:404) ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at org.apache.flink.runtime.checkpoint.channel.ChannelStateSerializerImpl.writeData(ChannelStateSerializer.java:164) ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.lambda$write$2(ChannelStateCheckpointWriter.java:194) ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.runWithChecks(ChannelStateCheckpointWriter.java:274) ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.write(ChannelStateCheckpointWriter.java:189) ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.writeInput(ChannelStateCheckpointWriter.java:153) ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.lambda$write$2(ChannelStateWriteRequest.java:139) ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.lambda$buildWriteRequest$8(ChannelStateWriteRequest.java:231) ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequest.execute(ChannelStateWriteRequest.java:366) ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.handleCheckpointInProgressRequest(ChannelStateWriteRequestDispatcherImpl.java:169) ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatchInternal(ChannelStateWriteRequestDispatcherImpl.java:119) ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatch(ChannelStateWriteRequestDispatcherImpl.java:86) ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.loop(ChannelStateWriteRequestExecutorImpl.java:182) ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.run(ChannelStateWriteRequestExecutorImpl.java:136) ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org