Re-posting the solution here from other threads: You can fix this by either
- Removing all Hadoop dependencies from your user jar - Set the framework back to parent-first classloading: https://ci. apache.org/projects/flink/flink-docs-master/monitoring/ debugging_classloading.html#configuring-classloader-resolution-order Hope that helps. On Wed, Jan 10, 2018 at 3:28 PM, Kyle Hamlin <hamlin...@gmail.com> wrote: > I'm having similar issues after moving from 1.3..2 to 1.4.0 > > *My mailing list thread: *BucketingSink doesn't work anymore moving from > 1.3.2 to 1.4.0 > <http://mail-archives.apache.org/mod_mbox/flink-user/201801.mbox/%3CCADAFrT8vonuRAhrrFwCen42wjojZ99LVGkhpyP2Qy0=7blr...@mail.gmail.com%3E> > > I'm not actually using hdfs as my sink. I'll be using s3 as my final sink > but I get the following error even when I've given a local file path to the > BucketingSink. > > java.lang.RuntimeException: Error while creating FileSystem when > initializing the state of the BucketingSink. > at org.apache.flink.streaming.connectors.fs.bucketing. > BucketingSink.initializeState(BucketingSink.java:358) > at org.apache.flink.streaming.util.functions.StreamingFunctionUtils. > tryRestoreFunction(StreamingFunctionUtils.java:178) > at org.apache.flink.streaming.util.functions.StreamingFunctionUtils. > restoreFunctionState(StreamingFunctionUtils.java:160) > at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator. > initializeState(AbstractUdfStreamOperator.java:96) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator. > initializeState(AbstractStreamOperator.java:259) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > initializeOperators(StreamTask.java:694) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > initializeState(StreamTask.java:682) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.IOException: Cannot instantiate file system for URI: > hdfs://localhost:12345/ > at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create( > HadoopFsFactory.java:187) > at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem( > FileSystem.java:401) > at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink. > createHadoopFileSystem(BucketingSink.java:1154) > at org.apache.flink.streaming.connectors.fs.bucketing. > BucketingSink.initFileSystem(BucketingSink.java:411) > at org.apache.flink.streaming.connectors.fs.bucketing. > BucketingSink.initializeState(BucketingSink.java:355) > ... 9 more > Caused by: java.lang.ClassCastException > > > > > > > On Wed, Jan 10, 2018 at 1:39 PM Chesnay Schepler <ches...@apache.org> > wrote: > >> Your analysis looks correct, the code in question will never properly >> detect hadoop file systems. I'll open a jira. >> >> Your suggestion to replace it with getUnguardedFileSystem() was my first >> instinct as well. >> >> Good job debugging this. >> >> >> On 10.01.2018 14:17, jelmer wrote: >> >> Hi I am trying to convert some jobs from flink 1.3.2 to flink 1.4.0 >> >> But i am running into the issue that the bucketing sink will always try >> and connect to hdfs://localhost:12345/ instead of the hfds url i have >> specified in the constructor >> >> If i look at the code at >> >> https://github.com/apache/flink/blob/master/flink- >> connectors/flink-connector-filesystem/src/main/java/org/ >> apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L1125 >> >> >> It tries to create the hadoop filesystem like this >> >> final org.apache.flink.core.fs.FileSystem flinkFs = >> org.apache.flink.core.fs.FileSystem.get(path.toUri()); >> final FileSystem hadoopFs = (flinkFs instanceof HadoopFileSystem) ? >> ((HadoopFileSystem) flinkFs).getHadoopFileSystem() : null; >> >> But FileSystem.getUnguardedFileSystem will always return a >> >> >> But FileSystem.get will always return a SafetyNetWrapperFileSystem so the >> instanceof check will never indicate that its a hadoop filesystem >> >> >> Am i missing something or is this a bug and if so what would be the >> correct fix ? I guess replacing FileSystem.get with >> FileSystem.getUnguardedFileSystem >> would fix it but I am afraid I lack the context to know if that would be >> safe >> >> >>