Re-posting the solution here from other threads:

You can fix this by either

  - Removing all Hadoop dependencies from your user jar
  - Set the framework back to parent-first classloading: https://ci.
apache.org/projects/flink/flink-docs-master/monitoring/
debugging_classloading.html#configuring-classloader-resolution-order

Hope that helps.

On Wed, Jan 10, 2018 at 3:28 PM, Kyle Hamlin <hamlin...@gmail.com> wrote:

> I'm having similar issues after moving from 1.3..2 to 1.4.0
>
> *My mailing list thread: *BucketingSink doesn't work anymore moving from
> 1.3.2 to 1.4.0
> <http://mail-archives.apache.org/mod_mbox/flink-user/201801.mbox/%3CCADAFrT8vonuRAhrrFwCen42wjojZ99LVGkhpyP2Qy0=7blr...@mail.gmail.com%3E>
>
> I'm not actually using hdfs as my sink. I'll be using s3 as my final sink
> but I get the following error even when I've given a local file path to the
> BucketingSink.
>
> java.lang.RuntimeException: Error while creating FileSystem when
> initializing the state of the BucketingSink.
> at org.apache.flink.streaming.connectors.fs.bucketing.
> BucketingSink.initializeState(BucketingSink.java:358)
> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.
> tryRestoreFunction(StreamingFunctionUtils.java:178)
> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.
> restoreFunctionState(StreamingFunctionUtils.java:160)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> initializeState(AbstractUdfStreamOperator.java:96)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> initializeState(AbstractStreamOperator.java:259)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeOperators(StreamTask.java:694)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeState(StreamTask.java:682)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Cannot instantiate file system for URI:
> hdfs://localhost:12345/
> at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(
> HadoopFsFactory.java:187)
> at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
> FileSystem.java:401)
> at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.
> createHadoopFileSystem(BucketingSink.java:1154)
> at org.apache.flink.streaming.connectors.fs.bucketing.
> BucketingSink.initFileSystem(BucketingSink.java:411)
> at org.apache.flink.streaming.connectors.fs.bucketing.
> BucketingSink.initializeState(BucketingSink.java:355)
> ... 9 more
> Caused by: java.lang.ClassCastException
>
>
>
>
>
>
> On Wed, Jan 10, 2018 at 1:39 PM Chesnay Schepler <ches...@apache.org>
> wrote:
>
>> Your analysis looks correct, the code in question will never properly
>> detect hadoop file systems. I'll open a jira.
>>
>> Your suggestion to replace it with getUnguardedFileSystem() was my first
>> instinct as well.
>>
>> Good job debugging this.
>>
>>
>> On 10.01.2018 14:17, jelmer wrote:
>>
>> Hi I am trying to convert some jobs from flink 1.3.2 to flink 1.4.0
>>
>> But i am running into the issue that the bucketing sink will always try
>> and connect to hdfs://localhost:12345/ instead of the hfds url i have
>> specified in the constructor
>>
>> If i look at the code at
>>
>> https://github.com/apache/flink/blob/master/flink-
>> connectors/flink-connector-filesystem/src/main/java/org/
>> apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L1125
>>
>>
>> It tries to create the hadoop filesystem like this
>>
>> final org.apache.flink.core.fs.FileSystem flinkFs =
>> org.apache.flink.core.fs.FileSystem.get(path.toUri());
>> final FileSystem hadoopFs = (flinkFs instanceof HadoopFileSystem) ?
>> ((HadoopFileSystem) flinkFs).getHadoopFileSystem() : null;
>>
>> But FileSystem.getUnguardedFileSystem will always return a
>>
>>
>> But FileSystem.get will always return a SafetyNetWrapperFileSystem so the
>> instanceof check will never indicate that its a hadoop filesystem
>>
>>
>> Am i missing something or is this a bug and if so what would be the
>> correct fix ? I guess replacing FileSystem.get with 
>> FileSystem.getUnguardedFileSystem
>> would fix it but I am afraid I lack the context to know if that would be
>> safe
>>
>>
>>

Reply via email to