fredia commented on PR #24653:
URL: https://github.com/apache/flink/pull/24653#issuecomment-2100331363

   When passing `CheckpointStorageWorkerView` directly to 
`ChannelStateWriteRequestDispatcher`,  `checkpointStorage` 
`UnalignedCheckpointRescaleITCase` might throw exceptions when the task exited.
   
   This is mainly because the task will close the stream held by the current 
thread when it exits, see 
`FileSystemSafetyNet#closeSafetyNetAndGuardedResourcesForThread`(Thanks for 
@1996fanrui's help).
   
   Therefore, I let  
`ChannelStateWriteRequestDispatcher#CheckpointStorageWorkerView` be lazily 
initialized.
   and changed the `filesystem` in `FsMergingCheckpointStorageAccess` to 
unwrapped `filesystem`, to prevent different threads from interfering with each 
other when exiting.
   
   ```
   Caused by: java.io.IOException: Could not open output stream for state 
backend
        at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:461)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
        at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flushToFile(FsCheckpointStreamFactory.java:308)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
        at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:284)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
        at java.io.DataOutputStream.write(DataOutputStream.java:107) 
~[?:1.8.0_292]
        at java.io.FilterOutputStream.write(FilterOutputStream.java:97) 
~[?:1.8.0_292]
        at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.getBytes(NetworkBuffer.java:404)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
        at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateSerializerImpl.writeData(ChannelStateSerializer.java:164)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
        at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.lambda$write$2(ChannelStateCheckpointWriter.java:194)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
        at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.runWithChecks(ChannelStateCheckpointWriter.java:274)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
        at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.write(ChannelStateCheckpointWriter.java:189)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
        at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.writeInput(ChannelStateCheckpointWriter.java:153)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
        at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.lambda$write$2(ChannelStateWriteRequest.java:139)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
        at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.lambda$buildWriteRequest$8(ChannelStateWriteRequest.java:231)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
        at 
org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequest.execute(ChannelStateWriteRequest.java:366)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
        at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.handleCheckpointInProgressRequest(ChannelStateWriteRequestDispatcherImpl.java:169)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
        at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatchInternal(ChannelStateWriteRequestDispatcherImpl.java:119)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
        at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatch(ChannelStateWriteRequestDispatcherImpl.java:86)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
        at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.loop(ChannelStateWriteRequestExecutorImpl.java:182)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
        at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.run(ChannelStateWriteRequestExecutorImpl.java:136)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
        ... 1 more
   Caused by: java.io.IOException: Cannot register Closeable, registry is 
already closed. Closing argument.
        at 
org.apache.flink.util.AbstractAutoCloseableRegistry.registerCloseable(AbstractAutoCloseableRegistry.java:89)
 ~[flink-core-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
        at 
org.apache.flink.core.fs.ClosingFSDataOutputStream.wrapSafe(ClosingFSDataOutputStream.java:103)
 ~[flink-core-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
        at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:131)
 ~[flink-core-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
        at 
org.apache.flink.core.fs.EntropyInjector.createEntropyAware(EntropyInjector.java:76)
 ~[flink-core-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
        at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:451)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
        at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flushToFile(FsCheckpointStreamFactory.java:308)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
        at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:284)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
        at java.io.DataOutputStream.write(DataOutputStream.java:107) 
~[?:1.8.0_292]
        at java.io.FilterOutputStream.write(FilterOutputStream.java:97) 
~[?:1.8.0_292]
        at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.getBytes(NetworkBuffer.java:404)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
        at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateSerializerImpl.writeData(ChannelStateSerializer.java:164)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
        at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.lambda$write$2(ChannelStateCheckpointWriter.java:194)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
        at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.runWithChecks(ChannelStateCheckpointWriter.java:274)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
        at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.write(ChannelStateCheckpointWriter.java:189)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
        at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.writeInput(ChannelStateCheckpointWriter.java:153)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
        at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.lambda$write$2(ChannelStateWriteRequest.java:139)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
        at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.lambda$buildWriteRequest$8(ChannelStateWriteRequest.java:231)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
        at 
org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequest.execute(ChannelStateWriteRequest.java:366)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
        at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.handleCheckpointInProgressRequest(ChannelStateWriteRequestDispatcherImpl.java:169)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
        at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatchInternal(ChannelStateWriteRequestDispatcherImpl.java:119)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
        at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatch(ChannelStateWriteRequestDispatcherImpl.java:86)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
        at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.loop(ChannelStateWriteRequestExecutorImpl.java:182)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
        at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.run(ChannelStateWriteRequestExecutorImpl.java:136)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to