Thanks for trying it out and letting us know. Cheers, Till
On Thu, Jan 11, 2018 at 9:56 AM, Oleksandr Baliev <aleksanderba...@gmail.com > wrote: > Hi Till, > > thanks for your reply and clarification! With RocksDBStateBackend btw the > same story, looks like a wrapper over FsStateBackend: > > 01/11/2018 09:27:22 Job execution switched to status FAILING. > org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not > find a file system implementation for scheme 'hdfs'. The scheme is not > directly supported by Flink and no Hadoop file system to support this > scheme could be loaded. > at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem( > FileSystem.java:405) > at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320) > at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293) > *at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.<init>(FsCheckpointStreamFactory.java:99)* > *at > org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:277)* > *at > org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createStreamFactory(RocksDBStateBackend.java:273)* > * at > org.apache.flink.streaming.runtime.tasks.StreamTask.createCheckpointStreamFactory(StreamTask.java:787)* > at org.apache.flink.streaming.api.operators.AbstractStreamOperator. > initializeState(AbstractStreamOperator.java:247) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > initializeOperators(StreamTask.java:694) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > initializeState(StreamTask.java:682) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: > *Hadoop > is not in the classpath/dependencies.* > at org.apache.flink.core.fs.UnsupportedSchemeFactory.create( > UnsupportedSchemeFactory.java:64) > at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem( > FileSystem.java:401) > > > Then I also changed url for fs state backend to file:// which is ok, but > then I have the same issue in BucketingSink: > > java.lang.RuntimeException: Error while creating FileSystem when > initializing the state of the BucketingSink. > at org.apache.flink.streaming.connectors.fs.bucketing. > BucketingSink.initializeState(BucketingSink.java:358) > ...<some our simple wrapper class call>.initializeState(...) > at org.apache.flink.streaming.util.functions.StreamingFunctionUtils. > tryRestoreFunction(StreamingFunctionUtils.java:178) > at org.apache.flink.streaming.util.functions.StreamingFunctionUtils. > restoreFunctionState(StreamingFunctionUtils.java:160) > at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator. > initializeState(AbstractUdfStreamOperator.java:96) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator. > initializeState(AbstractStreamOperator.java:259) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > initializeOperators(StreamTask.java:694) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > initializeState(StreamTask.java:682) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: > Could not find a file system implementation for scheme 'hdfs'. The scheme > is not directly supported by Flink and no Hadoop file system to support > this scheme could be loaded. > *at > org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:405)* > *at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)* > at org.apache.flink.streaming.connectors.fs.bucketing. > BucketingSink.initFileSystem(BucketingSink.java:411) > at org.apache.flink.streaming.connectors.fs.bucketing. > BucketingSink.initializeState(BucketingSink.java:355) > ... 10 more > Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: > *Hadoop > is not in the classpath/dependencies.* > at org.apache.flink.core.fs.UnsupportedSchemeFactory.create( > UnsupportedSchemeFactory.java:64) > at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem( > FileSystem.java:401) > ... 13 more > > > I was using for tests clean "Without bundled Hadood" flink binaries and > didn't change anything in configs. > > Currently we have to persist checkpoints on "hdfs" so we will use some > flink-shaded-hadoop2-uber*.jar anyway, thanks. > > Best, > Sasha > > 2018-01-10 10:47 GMT+01:00 Till Rohrmann <trohrm...@apache.org>: > >> Hi Sasha, >> >> you're right that if you want to access HDFS from the user code only it >> should be possible to use the Hadoop free Flink version and bundle the >> Hadoop dependencies with your user code. However, if you want to use >> Flink's file system state backend as you did, then you have to start the >> Flink cluster with the Hadoop dependency in its classpath. The reason is >> that the FsStateBackend is part of the Flink distribution and will be >> loaded using the system class loader. >> >> One thing you could try out is to use the RocksDB state backend instead. >> Since the RocksDBStateBackend is loaded dynamically, I think it should use >> the Hadoop dependencies when trying to load the filesystem. >> >> Cheers, >> Till >> >> On Tue, Jan 9, 2018 at 10:46 PM, Oleksandr Baliev < >> aleksanderba...@gmail.com> wrote: >> >>> Hello guys, >>> >>> want to clarify for myself: since flink 1.4.0 allows to use hadoop-free >>> distribution and dynamic hadoop dependencies loading, I suppose that if to >>> download hadoop-free distribution, start cluster without any hadoop and >>> then load any job's jar which has some hadoop dependencies (i >>> used 2.6.0-cdh5.10.1), hadoop should be visible in classpath and when start >>> job which accesses hdfs via source/sink/etc. or making checkpoints can be >>> run on such hadoop-free cluster. >>> >>> But when I start a job during config initialization for checkpoint I >>> have "Hadoop is not in the classpath/dependencies.": >>> >>> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could >>> not find a file system implementation for scheme 'hdfs'. The scheme is not >>> directly supported by Flink and no Hadoop file system to support this >>> scheme could be loaded. >>> at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(F >>> ileSystem.java:405) >>> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320) >>> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293) >>> at org.apache.flink.runtime.state.filesystem.FsCheckpointStream >>> Factory.<init>(FsCheckpointStreamFactory.java:99) >>> at org.apache.flink.runtime.state.filesystem.FsStateBackend.cre >>> ateStreamFactory(FsStateBackend.java:277) >>> ... >>> >>> >>> What I've found seems in org.apache.flink.core.fs.Fi >>> leSystem#getUnguardedFileSystem in FS_FACTORIES there is no "hdfs" >>> schema registered and FALLBACK_FACTORY which should be loaded with hadoop >>> factory has org.apache.flink.core.fs.UnsupportedSchemeFactory but it >>> loads when taskmanager is starting (when there should be no hadoop >>> dependencies), so that should be ok. >>> >>> so as I understand hadoop file system is not recongnised by flink if it >>> was not loaded at the beginning, is it correct or maybe I just messed up >>> with something / somewhere? >>> >>> Thanks, >>> Sasha >>> >> >> >