HI Flavio Here is the example from Marton: You can used env.writeAsText method directly.
StreamExecutionEnvironment env = StreamExecutionEnvironment. getExecutionEnvironment(); env.addSource(PerisitentKafkaSource(..)) .map(/* do you operations*/) .wirteAsText("hdfs://<namenode_name>:<namenode_port>/path/to/your/file"); Check out the relevant section of the streaming docs for more info. [1] [1] http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#connecting-to-the-outside-world On Thu, Jun 25, 2015 at 6:25 AM, Stephan Ewen <se...@apache.org> wrote: > You could also just qualify the HDFS URL, if that is simpler (put host and > port of the namenode in there): "hdfs://myhost:40010/path/to/file" > > On Thu, Jun 25, 2015 at 3:20 PM, Robert Metzger <rmetz...@apache.org> > wrote: > >> You have to put it into all machines >> >> On Thu, Jun 25, 2015 at 3:17 PM, Flavio Pompermaier <pomperma...@okkam.it >> > wrote: >> >>> Do I have to put the hadoop conf file on each task manager or just on >>> the job-manager? >>> >>> On Thu, Jun 25, 2015 at 3:12 PM, Chiwan Park <chiwanp...@apache.org> >>> wrote: >>> >>>> It represents the folder containing the hadoop config files. :) >>>> >>>> Regards, >>>> Chiwan Park >>>> >>>> >>>> > On Jun 25, 2015, at 10:07 PM, Flavio Pompermaier < >>>> pomperma...@okkam.it> wrote: >>>> > >>>> > fs.hdfs.hadoopconf represents the folder containing the hadoop config >>>> files (*-site.xml) or just one specific hadoop config file (e.g. >>>> core-site.xml or the hdfs-site.xml)? >>>> > >>>> > On Thu, Jun 25, 2015 at 3:04 PM, Robert Metzger <rmetz...@apache.org> >>>> wrote: >>>> > Hi Flavio, >>>> > >>>> > there is a file called "conf/flink-conf.yaml" >>>> > Add a new line in the file with the following contents: >>>> > >>>> > fs.hdfs.hadoopconf: /path/to/your/hadoop/config >>>> > >>>> > This should fix the problem. >>>> > Flink can not load the configuration file from the jar containing the >>>> user code, because the file system is initialized independent of the the >>>> job. So there is (currently) no way of initializing the file system using >>>> the user code classloader. >>>> > >>>> > What you can do is making the configuration file available to Flink's >>>> system classloader. For example by putting your user jar into the lib/ >>>> folder of Flink. You can also add the path to the Hadoop configuration >>>> files into the CLASSPATH of Flink (but you need to do that on all >>>> machines). >>>> > >>>> > I think the easiest approach is using Flink's configuration file. >>>> > >>>> > >>>> > On Thu, Jun 25, 2015 at 2:59 PM, Flavio Pompermaier < >>>> pomperma...@okkam.it> wrote: >>>> > Could you describe it better with an example please? Why Flink >>>> doesn't load automatically the properties of the hadoop conf files within >>>> the jar? >>>> > >>>> > On Thu, Jun 25, 2015 at 2:55 PM, Robert Metzger <rmetz...@apache.org> >>>> wrote: >>>> > Hi, >>>> > >>>> > Flink is not loading the Hadoop configuration from the classloader. >>>> You have to specify the path to the Hadoop configuration in the flink >>>> configuration "fs.hdfs.hadoopconf" >>>> > >>>> > On Thu, Jun 25, 2015 at 2:50 PM, Flavio Pompermaier < >>>> pomperma...@okkam.it> wrote: >>>> > Hi to all, >>>> > I'm experiencing some problem in writing a file as csv on HDFS with >>>> flink 0.9.0. >>>> > The code I use is >>>> > myDataset.writeAsCsv(new Path("hdfs:///tmp", >>>> "myFile.csv").toString()); >>>> > >>>> > If I run the job from Eclipse everything works fine but when I deploy >>>> the job on the cluster (cloudera 5.1.3) I obtain the following exception: >>>> > >>>> > Caused by: java.io.IOException: The given HDFS file URI >>>> (hdfs:///tmp/myFile.csv) did not describe the HDFS NameNode. The attempt to >>>> use a default HDFS configuration, as specified in the 'fs.hdfs.hdfsdefault' >>>> or 'fs.hdfs.hdfssite' config parameter failed due to the following problem: >>>> Either no default file system was registered, or the provided configuration >>>> contains no valid authority component (fs.default.name or >>>> fs.defaultFS) describing the (hdfs namenode) host and port. >>>> > at >>>> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:291) >>>> > at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:258) >>>> > at org.apache.flink.core.fs.Path.getFileSystem(Path.java:309) >>>> > at >>>> org.apache.flink.api.common.io.FileOutputFormat.initializeGlobal(FileOutputFormat.java:273) >>>> > at >>>> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:84) >>>> > at >>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520) >>>> > ... 25 more >>>> > >>>> > The core-site.xml is present in the fat jar and contains the property >>>> > >>>> > <property> >>>> > <name>fs.defaultFS</name> >>>> > <value>hdfs://myServerX:8020</value> >>>> > </property> >>>> > >>>> > I compiled flink with the following command: >>>> > >>>> > mvn clean install -Dhadoop.version=2.3.0-cdh5.1.3 >>>> -Dhbase.version=0.98.1-cdh5.1.3 -Dhadoop.core.version=2.3.0-mr1-cdh5.1.3 >>>> -DskipTests -Pvendor-repos >>>> > >>>> > How can I fix that? >>>> > >>>> > Best, >>>> > Flavio >>>> > >>>> >>>> >>> >>> >> >