Hi all,

We have a job that has a medium size state (around 4GB) and after adding a
new part of the job graph (which should not impact the job too much) we
found that every single checkpoint restore has the following error:

Caused by: java.io.IOException: s3a://<REDACTED>: Stream is closed!
> at
> org.apache.hadoop.fs.s3a.S3AInputStream.checkNotClosed(S3AInputStream.java:472)
> at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:347)
> at java.io.FilterInputStream.read(FilterInputStream.java:83)
> at
> org.apache.flink.fs.s3hadoop.common.HadoopDataInputStream.read(HadoopDataInputStream.java:86)
> at
> org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:50)
> at
> org.apache.flink.runtime.util.ForwardingInputStream.read(ForwardingInputStream.java:42)
> at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288)
> at org.apache.flink.types.StringValue.readString(StringValue.java:781)
> at
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73)
> at
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:126)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:32)
> at
> org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:161)
> at
> org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:43)
> at
> org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:289)
> at
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:323)
> at
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:285)
> at
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:172)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:112)
> ... 17 more


I haven't really got any clues on what this is caused by. You notice we are
using the Hadoop file system, but switching to Presto is a bit tricky for
us because of some of the bucket permissions that would need to change.

Anyone have tips on debugging (or solving this)?

Reply via email to