Hi Till,

thanks for your reply and clarification! With RocksDBStateBackend btw the
same story, looks like a wrapper over FsStateBackend:

01/11/2018 09:27:22 Job execution switched to status FAILING.
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
find a file system implementation for scheme 'hdfs'. The scheme is not
directly supported by Flink and no Hadoop file system to support this
scheme could be loaded.
at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:405)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
*at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.<init>(FsCheckpointStreamFactory.java:99)*
*at
org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:277)*
*at
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createStreamFactory(RocksDBStateBackend.java:273)*
* at
org.apache.flink.streaming.runtime.tasks.StreamTask.createCheckpointStreamFactory(StreamTask.java:787)*
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
*Hadoop
is not in the classpath/dependencies.*
at
org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)
at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)


Then I also changed url for fs state backend to file:// which is ok, but
then I have the same issue in BucketingSink:

java.lang.RuntimeException: Error while creating FileSystem when
initializing the state of the BucketingSink.
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358)
...<some our simple wrapper class call>.initializeState(...)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
Could not find a file system implementation for scheme 'hdfs'. The scheme
is not directly supported by Flink and no Hadoop file system to support
this scheme could be loaded.
*at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:405)*
*at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)*
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
... 10 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
*Hadoop
is not in the classpath/dependencies.*
at
org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)
at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
... 13 more


I was using for tests clean "Without bundled Hadood" flink binaries and
didn't change anything in configs.

Currently we have to persist checkpoints on "hdfs" so we will use some
flink-shaded-hadoop2-uber*.jar anyway, thanks.

Best,
Sasha

2018-01-10 10:47 GMT+01:00 Till Rohrmann <trohrm...@apache.org>:

> Hi Sasha,
>
> you're right that if you want to access HDFS from the user code only it
> should be possible to use the Hadoop free Flink version and bundle the
> Hadoop dependencies with your user code. However, if you want to use
> Flink's file system state backend as you did, then you have to start the
> Flink cluster with the Hadoop dependency in its classpath. The reason is
> that the FsStateBackend is part of the Flink distribution and will be
> loaded using the system class loader.
>
> One thing you could try out is to use the RocksDB state backend instead.
> Since the RocksDBStateBackend is loaded dynamically, I think it should use
> the Hadoop dependencies when trying to load the filesystem.
>
> Cheers,
> Till
>
> On Tue, Jan 9, 2018 at 10:46 PM, Oleksandr Baliev <
> aleksanderba...@gmail.com> wrote:
>
>> Hello guys,
>>
>> want to clarify for myself: since flink 1.4.0 allows to use hadoop-free
>> distribution and dynamic hadoop dependencies loading, I suppose that if to
>> download hadoop-free distribution, start cluster without any hadoop and
>> then load any job's jar which has some hadoop dependencies (i
>> used 2.6.0-cdh5.10.1), hadoop should be visible in classpath and when start
>> job which accesses hdfs via source/sink/etc. or making checkpoints can be
>> run on such hadoop-free cluster.
>>
>> But when I start a job during config initialization for checkpoint I have
>> "Hadoop is not in the classpath/dependencies.":
>>
>> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
>> find a file system implementation for scheme 'hdfs'. The scheme is not
>> directly supported by Flink and no Hadoop file system to support this
>> scheme could be loaded.
>> at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(F
>> ileSystem.java:405)
>> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
>> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
>> at org.apache.flink.runtime.state.filesystem.FsCheckpointStream
>> Factory.<init>(FsCheckpointStreamFactory.java:99)
>> at org.apache.flink.runtime.state.filesystem.FsStateBackend.cre
>> ateStreamFactory(FsStateBackend.java:277)
>> ...
>>
>>
>>  What I've found seems in org.apache.flink.core.fs.Fi
>> leSystem#getUnguardedFileSystem in FS_FACTORIES there is no "hdfs"
>> schema registered and FALLBACK_FACTORY which should be loaded with hadoop
>> factory has org.apache.flink.core.fs.UnsupportedSchemeFactory but it
>> loads when taskmanager is starting (when there should be no hadoop
>> dependencies), so that should be ok.
>>
>> so as I understand hadoop file system is not recongnised by flink if it
>> was not loaded at the beginning, is it correct or maybe I just messed up
>> with something / somewhere?
>>
>> Thanks,
>> Sasha
>>
>
>

Reply via email to