Thank you so much guys, I used "hdfs://nameservice/path/of/your/file", works fine for me now.
Best, Nick On Fri, Mar 20, 2020 at 3:48 AM Yang Wang <danrtsey...@gmail.com> wrote: > I think Jingsong is right. You miss a slash in your HDFS path. > > Usually a HDFS path is like this "hdfs://nameservice/path/of/your/file". > And the nameservice could be omitted if you want to use the defaultFS > configured in the core-site.xml. > > > Best, > Yang > > Jingsong Li <jingsongl...@gmail.com> 于2020年3月20日周五 上午10:09写道: > >> Hi Nick, >> >> You can try "new Path("hdfs:///tmp/auditlog/")". There is one additional >> / after hdfs://, which is a protocol name. >> >> Best, >> Jingsong Lee >> >> On Fri, Mar 20, 2020 at 3:13 AM Nick Bendtner <buggi...@gmail.com> wrote: >> >>> Hi guys, >>> I am using flink version 1.7.2. >>> I am trying to write to hdfs sink from my flink job. I >>> setup HADOOP_HOME. Here is the debug log for this : >>> >>> 2020-03-19 18:59:34,316 DEBUG org.apache.flink.runtime.util.HadoopUtils >>> - Cannot find hdfs-default configuration-file path in Flink >>> config. >>> 2020-03-19 18:59:34,317 DEBUG org.apache.flink.runtime.util.HadoopUtils >>> - Cannot find hdfs-site configuration-file path in Flink >>> config. >>> 2020-03-19 18:59:34,317 DEBUG org.apache.flink.runtime.util.HadoopUtils >>> - Adding /home/was/HDFSConf/conf/core-site.xml to hadoop >>> configuration >>> 2020-03-19 18:59:34,317 DEBUG org.apache.flink.runtime.util.HadoopUtils >>> - Adding /home/was/HDFSConf/conf/hdfs-site.xml to hadoop >>> configuration >>> 2020-03-19 18:59:34,344 INFO >>> org.apache.flink.runtime.security.modules.HadoopModule - Hadoop user >>> set to kafka (auth:KERBEROS) >>> >>> >>> This is what my streaming file sink code looks like. >>> >>> >>> val sink: StreamingFileSink[String] = StreamingFileSink >>> .forRowFormat(new Path("hdfs://tmp/auditlog/"), new >>> SimpleStringEncoder[String]("UTF-8")) >>> .withRollingPolicy(DefaultRollingPolicy.create() >>> .withRolloverInterval(TimeUnit.MINUTES.toMillis(15)) >>> .withInactivityInterval(TimeUnit.MINUTES.toMillis(5)) >>> .withMaxPartSize(1024 >>> * 1024 * 1024) >>> .build()) >>> .build() >>> >>> result.addSink(sink).name("HDFSSink") >>> >>> >>> When I run the job I get this error stack trace : >>> >>> INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - >>> Sink: HDFSSink (1/1) (27b62d6294da47491041d750daf421a0) switched from >>> RUNNING to FAILED. >>> java.io.IOException: Cannot instantiate file system for URI: >>> hdfs://tmp/auditlog >>> at >>> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187) >>> at >>> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399) >>> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318) >>> at >>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:112) >>> at >>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242) >>> at >>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327) >>> at >>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) >>> at >>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) >>> at >>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) >>> at java.lang.Thread.run(Thread.java:748) >>> Caused by: java.lang.IllegalArgumentException: >>> java.net.UnknownHostException: tmp >>> at >>> org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:378) >>> at >>> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:320) >>> at >>> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176) >>> at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:687) >>> at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:628) >>> at >>> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149) >>> >>> >>> Why is it trying to connect to /tmp ? Is it not supposed to get the >>> namenodes from the core-site.xml and hdfs-site.xml ? >>> >>> Can you please help with the correct way to configure hdfs sink. >>> >>> >>> Best, >>> >>> Nick. >>> >>> >>> >>> >> >> -- >> Best, Jingsong Lee >> >