Hi Flavio,

there is a file called "conf/flink-conf.yaml"
Add a new line in the file with the following contents:

fs.hdfs.hadoopconf: /path/to/your/hadoop/config

This should fix the problem.
Flink can not load the configuration file from the jar containing the user
code, because the file system is initialized independent of the the job. So
there is (currently) no way of initializing the file system using the user
code classloader.

What you can do is making the configuration file available to Flink's
system classloader. For example by putting your user jar into the lib/
folder of Flink. You can also add the path to the Hadoop configuration
files into the CLASSPATH of Flink (but you need to do that on all machines).

I think the easiest approach is using Flink's configuration file.


On Thu, Jun 25, 2015 at 2:59 PM, Flavio Pompermaier <pomperma...@okkam.it>
wrote:

> Could you describe it better with an example please? Why Flink doesn't
> load automatically the properties of the hadoop conf files within the jar?
>
> On Thu, Jun 25, 2015 at 2:55 PM, Robert Metzger <rmetz...@apache.org>
> wrote:
>
>> Hi,
>>
>> Flink is not loading the Hadoop configuration from the classloader. You
>> have to specify the path to the Hadoop configuration in the flink
>> configuration "fs.hdfs.hadoopconf"
>>
>> On Thu, Jun 25, 2015 at 2:50 PM, Flavio Pompermaier <pomperma...@okkam.it
>> > wrote:
>>
>>> Hi to all,
>>> I'm experiencing some problem in writing a file as csv on HDFS with
>>> flink 0.9.0.
>>> The code I use is
>>>   myDataset.writeAsCsv(new Path("hdfs:///tmp", "myFile.csv").toString());
>>>
>>> If I run the job from Eclipse everything works fine but when I deploy
>>> the job on the cluster (cloudera 5.1.3) I obtain the following exception:
>>>
>>> Caused by: java.io.IOException: The given HDFS file URI
>>> (hdfs:///tmp/myFile.csv) did not describe the HDFS NameNode. The attempt to
>>> use a default HDFS configuration, as specified in the 'fs.hdfs.hdfsdefault'
>>> or 'fs.hdfs.hdfssite' config parameter failed due to the following problem:
>>> Either no default file system was registered, or the provided configuration
>>> contains no valid authority component (fs.default.name or fs.defaultFS)
>>> describing the (hdfs namenode) host and port.
>>> at
>>> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:291)
>>> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:258)
>>> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:309)
>>> at
>>> org.apache.flink.api.common.io.FileOutputFormat.initializeGlobal(FileOutputFormat.java:273)
>>> at
>>> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:84)
>>> at
>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520)
>>> ... 25 more
>>>
>>> The core-site.xml is present in the fat jar and contains the property
>>>
>>> <property>
>>>     <name>fs.defaultFS</name>
>>>     <value>hdfs://myServerX:8020</value>
>>>   </property>
>>>
>>> I compiled flink with the following command:
>>>
>>>  mvn clean  install -Dhadoop.version=2.3.0-cdh5.1.3
>>> -Dhbase.version=0.98.1-cdh5.1.3 -Dhadoop.core.version=2.3.0-mr1-cdh5.1.3
>>> -DskipTests -Pvendor-repos
>>>
>>> How can I fix that?
>>>
>>> Best,
>>> Flavio
>>>
>>>
>>
>

Reply via email to