Hi all, We have a job that has a medium size state (around 4GB) and after adding a new part of the job graph (which should not impact the job too much) we found that every single checkpoint restore has the following error:
Caused by: java.io.IOException: s3a://<REDACTED>: Stream is closed! > at > org.apache.hadoop.fs.s3a.S3AInputStream.checkNotClosed(S3AInputStream.java:472) > at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:347) > at java.io.FilterInputStream.read(FilterInputStream.java:83) > at > org.apache.flink.fs.s3hadoop.common.HadoopDataInputStream.read(HadoopDataInputStream.java:86) > at > org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:50) > at > org.apache.flink.runtime.util.ForwardingInputStream.read(ForwardingInputStream.java:42) > at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288) > at org.apache.flink.types.StringValue.readString(StringValue.java:781) > at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73) > at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:126) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:32) > at > org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:161) > at > org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:43) > at > org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:289) > at > org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:323) > at > org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:285) > at > org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:172) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:112) > ... 17 more I haven't really got any clues on what this is caused by. You notice we are using the Hadoop file system, but switching to Presto is a bit tricky for us because of some of the bucket permissions that would need to change. Anyone have tips on debugging (or solving this)?