hi community,
i want append results to one file. if i work local my function build all
right,
if i run this on a yarn cluster, i lost same rows.
here my function to write:
points.foreach(
new VoidFunction<Tuple2<Integer, GeoTimeDataTupel>>() {
private static final long serialVersionUID =
2459995649387229261L;
public void call(Tuple2<Integer, GeoTimeDataTupel>
entry)throws Exception {
try {
FileSystem fs = FileSystem.get(new
URI(pro.getProperty("hdfs.namenode")),new Configuration());
Path pt=new
Path(fs.getHomeDirectory()+pro.getProperty("spark.output")+"/results");
if(fs.exists(pt)) {
FSDataInputStream in = fs.open(pt);
Path pt_temp = new
Path(fs.getHomeDirectory()+pro.getProperty("spark.output")+"/results_temp");
backup(fs.getConf(), fs, in, pt_temp);
in.close();
FSDataOutputStream out = fs.create((pt), true);
FSDataInputStream backup = fs.open(pt_temp);
int offset = 0;
int bufferSize = 4096;
int result = 0;
byte[] buffer = new byte[bufferSize];
// pre read a part of content from input stream
result = backup.read(offset, buffer, 0,
bufferSize);
// loop read input stream until it does not
fill whole size of buffer
while (result == bufferSize) {
out.write(buffer);
// read next segment from input stream by
moving the offset pointer
offset += bufferSize;
result = backup.read(offset, buffer, 0,
bufferSize);
}
if (result > 0 && result < bufferSize) {
for (int i = 0; i < result; i++) {
out.write(buffer[i]);
}
}
out.writeBytes("Cluster: "+entry._1+", Point:
"+entry._2.toString()+"\n");
out.close();
}
else {
BufferedWriter bw =new BufferedWriter(new
OutputStreamWriter(fs.create(pt)));
bw.write("Cluster: "+entry._1+", Point:
"+entry._2.toString()+"\n");
bw.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
public void backup(Configuration conf, FileSystem
fs,FSDataInputStream sourceContent, Path pt_temp) throws Exception {
FSDataOutputStream out = fs.create(pt_temp, true);
IOUtils.copyBytes(sourceContent, out, 4096, false);
out.close();
}
where is my fault?? or give it a function to write(append) to the hadoop
hdfs?
best regards,
paul