Thanks for trying it out and letting us know.

Cheers,
Till

On Thu, Jan 11, 2018 at 9:56 AM, Oleksandr Baliev <aleksanderba...@gmail.com
> wrote:

> Hi Till,
>
> thanks for your reply and clarification! With RocksDBStateBackend btw the
> same story, looks like a wrapper over FsStateBackend:
>
> 01/11/2018 09:27:22 Job execution switched to status FAILING.
> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
> find a file system implementation for scheme 'hdfs'. The scheme is not
> directly supported by Flink and no Hadoop file system to support this
> scheme could be loaded.
> at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
> FileSystem.java:405)
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
> *at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.<init>(FsCheckpointStreamFactory.java:99)*
> *at
> org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:277)*
> *at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createStreamFactory(RocksDBStateBackend.java:273)*
> * at
> org.apache.flink.streaming.runtime.tasks.StreamTask.createCheckpointStreamFactory(StreamTask.java:787)*
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> initializeState(AbstractStreamOperator.java:247)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeOperators(StreamTask.java:694)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeState(StreamTask.java:682)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: 
> *Hadoop
> is not in the classpath/dependencies.*
> at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(
> UnsupportedSchemeFactory.java:64)
> at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
> FileSystem.java:401)
>
>
> Then I also changed url for fs state backend to file:// which is ok, but
> then I have the same issue in BucketingSink:
>
> java.lang.RuntimeException: Error while creating FileSystem when
> initializing the state of the BucketingSink.
> at org.apache.flink.streaming.connectors.fs.bucketing.
> BucketingSink.initializeState(BucketingSink.java:358)
> ...<some our simple wrapper class call>.initializeState(...)
> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.
> tryRestoreFunction(StreamingFunctionUtils.java:178)
> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.
> restoreFunctionState(StreamingFunctionUtils.java:160)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> initializeState(AbstractUdfStreamOperator.java:96)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> initializeState(AbstractStreamOperator.java:259)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeOperators(StreamTask.java:694)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeState(StreamTask.java:682)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Could not find a file system implementation for scheme 'hdfs'. The scheme
> is not directly supported by Flink and no Hadoop file system to support
> this scheme could be loaded.
> *at
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:405)*
> *at
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)*
> at org.apache.flink.streaming.connectors.fs.bucketing.
> BucketingSink.initFileSystem(BucketingSink.java:411)
> at org.apache.flink.streaming.connectors.fs.bucketing.
> BucketingSink.initializeState(BucketingSink.java:355)
> ... 10 more
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: 
> *Hadoop
> is not in the classpath/dependencies.*
> at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(
> UnsupportedSchemeFactory.java:64)
> at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
> FileSystem.java:401)
> ... 13 more
>
>
> I was using for tests clean "Without bundled Hadood" flink binaries and
> didn't change anything in configs.
>
> Currently we have to persist checkpoints on "hdfs" so we will use some
> flink-shaded-hadoop2-uber*.jar anyway, thanks.
>
> Best,
> Sasha
>
> 2018-01-10 10:47 GMT+01:00 Till Rohrmann <trohrm...@apache.org>:
>
>> Hi Sasha,
>>
>> you're right that if you want to access HDFS from the user code only it
>> should be possible to use the Hadoop free Flink version and bundle the
>> Hadoop dependencies with your user code. However, if you want to use
>> Flink's file system state backend as you did, then you have to start the
>> Flink cluster with the Hadoop dependency in its classpath. The reason is
>> that the FsStateBackend is part of the Flink distribution and will be
>> loaded using the system class loader.
>>
>> One thing you could try out is to use the RocksDB state backend instead.
>> Since the RocksDBStateBackend is loaded dynamically, I think it should use
>> the Hadoop dependencies when trying to load the filesystem.
>>
>> Cheers,
>> Till
>>
>> On Tue, Jan 9, 2018 at 10:46 PM, Oleksandr Baliev <
>> aleksanderba...@gmail.com> wrote:
>>
>>> Hello guys,
>>>
>>> want to clarify for myself: since flink 1.4.0 allows to use hadoop-free
>>> distribution and dynamic hadoop dependencies loading, I suppose that if to
>>> download hadoop-free distribution, start cluster without any hadoop and
>>> then load any job's jar which has some hadoop dependencies (i
>>> used 2.6.0-cdh5.10.1), hadoop should be visible in classpath and when start
>>> job which accesses hdfs via source/sink/etc. or making checkpoints can be
>>> run on such hadoop-free cluster.
>>>
>>> But when I start a job during config initialization for checkpoint I
>>> have "Hadoop is not in the classpath/dependencies.":
>>>
>>> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could
>>> not find a file system implementation for scheme 'hdfs'. The scheme is not
>>> directly supported by Flink and no Hadoop file system to support this
>>> scheme could be loaded.
>>> at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(F
>>> ileSystem.java:405)
>>> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
>>> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
>>> at org.apache.flink.runtime.state.filesystem.FsCheckpointStream
>>> Factory.<init>(FsCheckpointStreamFactory.java:99)
>>> at org.apache.flink.runtime.state.filesystem.FsStateBackend.cre
>>> ateStreamFactory(FsStateBackend.java:277)
>>> ...
>>>
>>>
>>>  What I've found seems in org.apache.flink.core.fs.Fi
>>> leSystem#getUnguardedFileSystem in FS_FACTORIES there is no "hdfs"
>>> schema registered and FALLBACK_FACTORY which should be loaded with hadoop
>>> factory has org.apache.flink.core.fs.UnsupportedSchemeFactory but it
>>> loads when taskmanager is starting (when there should be no hadoop
>>> dependencies), so that should be ok.
>>>
>>> so as I understand hadoop file system is not recongnised by flink if it
>>> was not loaded at the beginning, is it correct or maybe I just messed up
>>> with something / somewhere?
>>>
>>> Thanks,
>>> Sasha
>>>
>>
>>
>

Reply via email to