Hi, Thanks for reporting the issue, I’ve created the jira ticket for that [1]. We will investigate it and try to address it somehow.
Could you try out if the same issue happen when you use flink-s3-fs-presto [2]? Piotrek [1] https://issues.apache.org/jira/browse/FLINK-14574 [2] https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/s3.html#shaded-hadooppresto-s3-file-systems <https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/s3.html#shaded-hadooppresto-s3-file-systems> > On 26 Oct 2019, at 23:11, spoganshev <s.pogans...@slice.com> wrote: > > We've added flink-s3-fs-hadoop library to plugins folder and trying to > bootstrap state to S3 using S3A protocol. The following exception happens > (unless hadoop library is put to lib folder instead of plugins). Looks like > S3A filesystem is trying to use "local" filesystem for temporary files and > fails: > > > java.lang.Exception: Could not write timer service of MapPartition > (d2976134f80849779b7a94b7e6218476) (4/4) to checkpoint state stream. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:466) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399) > at > org.apache.flink.state.api.output.SnapshotUtils.snapshot(SnapshotUtils.java:59) > at > org.apache.flink.state.api.output.operators.KeyedStateBootstrapOperator.endInput(KeyedStateBootstrapOperator.java:84) > at > org.apache.flink.state.api.output.BoundedStreamTask.performDefaultAction(BoundedStreamTask.java:85) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403) > at > org.apache.flink.state.api.output.BoundedOneInputStreamTaskRunner.mapPartition(BoundedOneInputStreamTaskRunner.java:76) > at > org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:504) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.IOException: Could not open output stream for state > backend > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:367) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:209) > at > org.apache.flink.runtime.state.NonClosingCheckpointOutputStream.write(NonClosingCheckpointOutputStream.java:61) > at java.io.DataOutputStream.write(DataOutputStream.java:107) > at java.io.DataOutputStream.writeUTF(DataOutputStream.java:401) > at java.io.DataOutputStream.writeUTF(DataOutputStream.java:323) > at > org.apache.flink.util.LinkedOptionalMapSerializer.lambda$writeOptionalMap$0(LinkedOptionalMapSerializer.java:58) > at > org.apache.flink.util.LinkedOptionalMap.forEach(LinkedOptionalMap.java:163) > at > org.apache.flink.util.LinkedOptionalMapSerializer.writeOptionalMap(LinkedOptionalMapSerializer.java:57) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshotData.writeKryoRegistrations(KryoSerializerSnapshotData.java:141) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshotData.writeSnapshotData(KryoSerializerSnapshotData.java:128) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.writeSnapshot(KryoSerializerSnapshot.java:72) > at > org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.writeVersionedSnapshot(TypeSerializerSnapshot.java:153) > at > org.apache.flink.streaming.api.operators.InternalTimersSnapshotReaderWriters$InternalTimersSnapshotWriterV2.writeKeyAndNamespaceSerializers(InternalTimersSnapshotReaderWriters.java:199) > at > org.apache.flink.streaming.api.operators.InternalTimersSnapshotReaderWriters$AbstractInternalTimersSnapshotWriter.writeTimersSnapshot(InternalTimersSnapshotReaderWriters.java:117) > at > org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy.write(InternalTimerServiceSerializationProxy.java:101) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.snapshotStateForKeyGroup(InternalTimeServiceManager.java:139) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:462) > ... 14 common frames omitted > Caused by: > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.UnsupportedFileSystemException: > No FileSystem for scheme "file" > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3332) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.get(FileSystem.java:477) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.getLocal(FileSystem.java:433) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:301) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:378) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:456) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:200) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:572) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3ADataBlocks$DiskBlockFactory.create(S3ADataBlocks.java:811) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3ABlockOutputStream.createBlockIfNeeded(S3ABlockOutputStream.java:190) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3ABlockOutputStream.<init>(S3ABlockOutputStream.java:168) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:778) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038) > at > org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.create(HadoopFileSystem.java:141) > at > org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.create(HadoopFileSystem.java:37) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:126) > at > org.apache.flink.core.fs.EntropyInjector.createEntropyAware(EntropyInjector.java:61) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:356) > ... 32 common frames omitted > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/