hi,
i have an idea to solve my problem, i want write one file for each spark
partion,
but i not know to get the actuel partion suffix/ID in my call function?
points.foreachPartition(
new VoidFunction<Iterator<Tuple2<Integer,
GeoTimeDataTupel>>>() {
private static final long serialVersionUID =
-7210897529331503565L;
public void call(Iterator<Tuple2<Integer,
GeoTimeDataTupel>> entry)throws Exception {
while(entry.hasNext()) {
Tuple2<Integer, GeoTimeDataTupel> temp =
entry.next();
try {
FileSystem fs = FileSystem.get(new
URI(pro.getProperty("hdfs.namenode")),new Configuration());
Path pt=new
Path(fs.getHomeDirectory()+pro.getProperty("spark.output")+"/results");
}
catch(Exception e) {
e.printStackTrace();
}
}
}
}
);
2015-06-09 15:34 GMT+02:00 Pa Rö <[email protected]>:
> hi community,
>
> i want append results to one file. if i work local my function build all
> right,
> if i run this on a yarn cluster, i lost same rows.
>
> here my function to write:
>
> points.foreach(
> new VoidFunction<Tuple2<Integer, GeoTimeDataTupel>>() {
>
> private static final long serialVersionUID =
> 2459995649387229261L;
>
> public void call(Tuple2<Integer, GeoTimeDataTupel>
> entry)throws Exception {
> try {
> FileSystem fs = FileSystem.get(new
> URI(pro.getProperty("hdfs.namenode")),new Configuration());
> Path pt=new
> Path(fs.getHomeDirectory()+pro.getProperty("spark.output")+"/results");
>
> if(fs.exists(pt)) {
> FSDataInputStream in = fs.open(pt);
> Path pt_temp = new
> Path(fs.getHomeDirectory()+pro.getProperty("spark.output")+"/results_temp");
> backup(fs.getConf(), fs, in, pt_temp);
> in.close();
>
> FSDataOutputStream out = fs.create((pt), true);
> FSDataInputStream backup = fs.open(pt_temp);
>
> int offset = 0;
> int bufferSize = 4096;
>
> int result = 0;
>
> byte[] buffer = new byte[bufferSize];
> // pre read a part of content from input stream
> result = backup.read(offset, buffer, 0,
> bufferSize);
> // loop read input stream until it does not
> fill whole size of buffer
> while (result == bufferSize) {
> out.write(buffer);
> // read next segment from input stream by
> moving the offset pointer
> offset += bufferSize;
> result = backup.read(offset, buffer, 0,
> bufferSize);
> }
>
> if (result > 0 && result < bufferSize) {
> for (int i = 0; i < result; i++) {
> out.write(buffer[i]);
> }
> }
> out.writeBytes("Cluster: "+entry._1+", Point:
> "+entry._2.toString()+"\n");
> out.close();
> }
> else {
> BufferedWriter bw =new BufferedWriter(new
> OutputStreamWriter(fs.create(pt)));
> bw.write("Cluster: "+entry._1+", Point:
> "+entry._2.toString()+"\n");
> bw.close();
> }
> } catch (Exception e) {
> e.printStackTrace();
> }
> }
>
> public void backup(Configuration conf, FileSystem
> fs,FSDataInputStream sourceContent, Path pt_temp) throws Exception {
>
> FSDataOutputStream out = fs.create(pt_temp, true);
> IOUtils.copyBytes(sourceContent, out, 4096, false);
> out.close();
> }
>
> where is my fault?? or give it a function to write(append) to the hadoop
> hdfs?
>
> best regards,
> paul
>