HI  Flavio

Here is the example from Marton:
You can used env.writeAsText method directly.


StreamExecutionEnvironment env = StreamExecutionEnvironment.
getExecutionEnvironment();

env.addSource(PerisitentKafkaSource(..))
      .map(/* do you operations*/)

.wirteAsText("hdfs://<namenode_name>:<namenode_port>/path/to/your/file");

Check out the relevant section of the streaming docs for more info. [1]

[1]
http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#connecting-to-the-outside-world


On Thu, Jun 25, 2015 at 6:25 AM, Stephan Ewen <se...@apache.org> wrote:

> You could also just qualify the HDFS URL, if that is simpler (put host and
> port of the namenode in there): "hdfs://myhost:40010/path/to/file"
>
> On Thu, Jun 25, 2015 at 3:20 PM, Robert Metzger <rmetz...@apache.org>
> wrote:
>
>> You have to put it into all machines
>>
>> On Thu, Jun 25, 2015 at 3:17 PM, Flavio Pompermaier <pomperma...@okkam.it
>> > wrote:
>>
>>> Do I have to put the hadoop conf file on each task manager or just on
>>> the job-manager?
>>>
>>> On Thu, Jun 25, 2015 at 3:12 PM, Chiwan Park <chiwanp...@apache.org>
>>> wrote:
>>>
>>>> It represents the folder containing the hadoop config files. :)
>>>>
>>>> Regards,
>>>> Chiwan Park
>>>>
>>>>
>>>> > On Jun 25, 2015, at 10:07 PM, Flavio Pompermaier <
>>>> pomperma...@okkam.it> wrote:
>>>> >
>>>> > fs.hdfs.hadoopconf represents the folder containing the hadoop config
>>>> files (*-site.xml) or just one specific hadoop config file (e.g.
>>>> core-site.xml or the hdfs-site.xml)?
>>>> >
>>>> > On Thu, Jun 25, 2015 at 3:04 PM, Robert Metzger <rmetz...@apache.org>
>>>> wrote:
>>>> > Hi Flavio,
>>>> >
>>>> > there is a file called "conf/flink-conf.yaml"
>>>> > Add a new line in the file with the following contents:
>>>> >
>>>> > fs.hdfs.hadoopconf: /path/to/your/hadoop/config
>>>> >
>>>> > This should fix the problem.
>>>> > Flink can not load the configuration file from the jar containing the
>>>> user code, because the file system is initialized independent of the the
>>>> job. So there is (currently) no way of initializing the file system using
>>>> the user code classloader.
>>>> >
>>>> > What you can do is making the configuration file available to Flink's
>>>> system classloader. For example by putting your user jar into the lib/
>>>> folder of Flink. You can also add the path to the Hadoop configuration
>>>> files into the CLASSPATH of Flink (but you need to do that on all 
>>>> machines).
>>>> >
>>>> > I think the easiest approach is using Flink's configuration file.
>>>> >
>>>> >
>>>> > On Thu, Jun 25, 2015 at 2:59 PM, Flavio Pompermaier <
>>>> pomperma...@okkam.it> wrote:
>>>> > Could you describe it better with an example please? Why Flink
>>>> doesn't load automatically the properties of the hadoop conf files within
>>>> the jar?
>>>> >
>>>> > On Thu, Jun 25, 2015 at 2:55 PM, Robert Metzger <rmetz...@apache.org>
>>>> wrote:
>>>> > Hi,
>>>> >
>>>> > Flink is not loading the Hadoop configuration from the classloader.
>>>> You have to specify the path to the Hadoop configuration in the flink
>>>> configuration "fs.hdfs.hadoopconf"
>>>> >
>>>> > On Thu, Jun 25, 2015 at 2:50 PM, Flavio Pompermaier <
>>>> pomperma...@okkam.it> wrote:
>>>> > Hi to all,
>>>> > I'm experiencing some problem in writing a file as csv on HDFS with
>>>> flink 0.9.0.
>>>> > The code I use is
>>>> >   myDataset.writeAsCsv(new Path("hdfs:///tmp",
>>>> "myFile.csv").toString());
>>>> >
>>>> > If I run the job from Eclipse everything works fine but when I deploy
>>>> the job on the cluster (cloudera 5.1.3) I obtain the following exception:
>>>> >
>>>> > Caused by: java.io.IOException: The given HDFS file URI
>>>> (hdfs:///tmp/myFile.csv) did not describe the HDFS NameNode. The attempt to
>>>> use a default HDFS configuration, as specified in the 'fs.hdfs.hdfsdefault'
>>>> or 'fs.hdfs.hdfssite' config parameter failed due to the following problem:
>>>> Either no default file system was registered, or the provided configuration
>>>> contains no valid authority component (fs.default.name or
>>>> fs.defaultFS) describing the (hdfs namenode) host and port.
>>>> >       at
>>>> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:291)
>>>> >       at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:258)
>>>> >       at org.apache.flink.core.fs.Path.getFileSystem(Path.java:309)
>>>> >       at
>>>> org.apache.flink.api.common.io.FileOutputFormat.initializeGlobal(FileOutputFormat.java:273)
>>>> >       at
>>>> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:84)
>>>> >       at
>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520)
>>>> >       ... 25 more
>>>> >
>>>> > The core-site.xml is present in the fat jar and contains the property
>>>> >
>>>> > <property>
>>>> >     <name>fs.defaultFS</name>
>>>> >     <value>hdfs://myServerX:8020</value>
>>>> >   </property>
>>>> >
>>>> > I compiled flink with the following command:
>>>> >
>>>> >  mvn clean  install -Dhadoop.version=2.3.0-cdh5.1.3
>>>> -Dhbase.version=0.98.1-cdh5.1.3 -Dhadoop.core.version=2.3.0-mr1-cdh5.1.3
>>>> -DskipTests -Pvendor-repos
>>>> >
>>>> > How can I fix that?
>>>> >
>>>> > Best,
>>>> > Flavio
>>>> >
>>>>
>>>>
>>>
>>>
>>
>

Reply via email to