Hi Till, thanks for your reply and clarification! With RocksDBStateBackend btw the same story, looks like a wrapper over FsStateBackend:
01/11/2018 09:27:22 Job execution switched to status FAILING. org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded. at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:405) at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320) at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293) *at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.<init>(FsCheckpointStreamFactory.java:99)* *at org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:277)* *at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createStreamFactory(RocksDBStateBackend.java:273)* * at org.apache.flink.streaming.runtime.tasks.StreamTask.createCheckpointStreamFactory(StreamTask.java:787)* at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: *Hadoop is not in the classpath/dependencies.* at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64) at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401) Then I also changed url for fs state backend to file:// which is ok, but then I have the same issue in BucketingSink: java.lang.RuntimeException: Error while creating FileSystem when initializing the state of the BucketingSink. at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358) ...<some our simple wrapper class call>.initializeState(...) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded. *at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:405)* *at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)* at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355) ... 10 more Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: *Hadoop is not in the classpath/dependencies.* at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64) at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401) ... 13 more I was using for tests clean "Without bundled Hadood" flink binaries and didn't change anything in configs. Currently we have to persist checkpoints on "hdfs" so we will use some flink-shaded-hadoop2-uber*.jar anyway, thanks. Best, Sasha 2018-01-10 10:47 GMT+01:00 Till Rohrmann <trohrm...@apache.org>: > Hi Sasha, > > you're right that if you want to access HDFS from the user code only it > should be possible to use the Hadoop free Flink version and bundle the > Hadoop dependencies with your user code. However, if you want to use > Flink's file system state backend as you did, then you have to start the > Flink cluster with the Hadoop dependency in its classpath. The reason is > that the FsStateBackend is part of the Flink distribution and will be > loaded using the system class loader. > > One thing you could try out is to use the RocksDB state backend instead. > Since the RocksDBStateBackend is loaded dynamically, I think it should use > the Hadoop dependencies when trying to load the filesystem. > > Cheers, > Till > > On Tue, Jan 9, 2018 at 10:46 PM, Oleksandr Baliev < > aleksanderba...@gmail.com> wrote: > >> Hello guys, >> >> want to clarify for myself: since flink 1.4.0 allows to use hadoop-free >> distribution and dynamic hadoop dependencies loading, I suppose that if to >> download hadoop-free distribution, start cluster without any hadoop and >> then load any job's jar which has some hadoop dependencies (i >> used 2.6.0-cdh5.10.1), hadoop should be visible in classpath and when start >> job which accesses hdfs via source/sink/etc. or making checkpoints can be >> run on such hadoop-free cluster. >> >> But when I start a job during config initialization for checkpoint I have >> "Hadoop is not in the classpath/dependencies.": >> >> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not >> find a file system implementation for scheme 'hdfs'. The scheme is not >> directly supported by Flink and no Hadoop file system to support this >> scheme could be loaded. >> at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(F >> ileSystem.java:405) >> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320) >> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293) >> at org.apache.flink.runtime.state.filesystem.FsCheckpointStream >> Factory.<init>(FsCheckpointStreamFactory.java:99) >> at org.apache.flink.runtime.state.filesystem.FsStateBackend.cre >> ateStreamFactory(FsStateBackend.java:277) >> ... >> >> >> What I've found seems in org.apache.flink.core.fs.Fi >> leSystem#getUnguardedFileSystem in FS_FACTORIES there is no "hdfs" >> schema registered and FALLBACK_FACTORY which should be loaded with hadoop >> factory has org.apache.flink.core.fs.UnsupportedSchemeFactory but it >> loads when taskmanager is starting (when there should be no hadoop >> dependencies), so that should be ok. >> >> so as I understand hadoop file system is not recongnised by flink if it >> was not loaded at the beginning, is it correct or maybe I just messed up >> with something / somewhere? >> >> Thanks, >> Sasha >> > >