I'm having similar issues after moving from 1.3..2 to 1.4.0 *My mailing list thread: *BucketingSink doesn't work anymore moving from 1.3.2 to 1.4.0 <http://mail-archives.apache.org/mod_mbox/flink-user/201801.mbox/%3CCADAFrT8vonuRAhrrFwCen42wjojZ99LVGkhpyP2Qy0=7blr...@mail.gmail.com%3E>
I'm not actually using hdfs as my sink. I'll be using s3 as my final sink but I get the following error even when I've given a local file path to the BucketingSink. java.lang.RuntimeException: Error while creating FileSystem when initializing the state of the BucketingSink. at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Cannot instantiate file system for URI: hdfs://localhost:12345/ at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187) at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355) ... 9 more Caused by: java.lang.ClassCastException On Wed, Jan 10, 2018 at 1:39 PM Chesnay Schepler <ches...@apache.org> wrote: > Your analysis looks correct, the code in question will never properly > detect hadoop file systems. I'll open a jira. > > Your suggestion to replace it with getUnguardedFileSystem() was my first > instinct as well. > > Good job debugging this. > > > On 10.01.2018 14:17, jelmer wrote: > > Hi I am trying to convert some jobs from flink 1.3.2 to flink 1.4.0 > > But i am running into the issue that the bucketing sink will always try > and connect to hdfs://localhost:12345/ instead of the hfds url i have > specified in the constructor > > If i look at the code at > > > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L1125 > > > It tries to create the hadoop filesystem like this > > final org.apache.flink.core.fs.FileSystem flinkFs = > org.apache.flink.core.fs.FileSystem.get(path.toUri()); > final FileSystem hadoopFs = (flinkFs instanceof HadoopFileSystem) ? > ((HadoopFileSystem) flinkFs).getHadoopFileSystem() : null; > > But FileSystem.getUnguardedFileSystem will always return a > > > But FileSystem.get will always return a SafetyNetWrapperFileSystem so the > instanceof check will never indicate that its a hadoop filesystem > > > Am i missing something or is this a bug and if so what would be the > correct fix ? I guess replacing FileSystem.get with > FileSystem.getUnguardedFileSystem would fix it but I am afraid I lack the > context to know if that would be safe > > >