hi community,

i want append results to one file. if i work local my function build all
right,
if i run this on a yarn cluster, i lost same rows.

here my function to write:

points.foreach(
            new VoidFunction<Tuple2<Integer, GeoTimeDataTupel>>() {

                private static final long serialVersionUID =
2459995649387229261L;

                public void call(Tuple2<Integer, GeoTimeDataTupel>
entry)throws Exception {
                    try {
                        FileSystem fs = FileSystem.get(new
URI(pro.getProperty("hdfs.namenode")),new Configuration());
                        Path pt=new
Path(fs.getHomeDirectory()+pro.getProperty("spark.output")+"/results");

                        if(fs.exists(pt)) {
                            FSDataInputStream in = fs.open(pt);
                            Path pt_temp = new
Path(fs.getHomeDirectory()+pro.getProperty("spark.output")+"/results_temp");
                            backup(fs.getConf(), fs, in, pt_temp);
                            in.close();

                            FSDataOutputStream out = fs.create((pt), true);
                            FSDataInputStream backup = fs.open(pt_temp);

                            int offset = 0;
                            int bufferSize = 4096;

                            int result = 0;

                            byte[] buffer = new byte[bufferSize];
                            // pre read a part of content from input stream
                            result = backup.read(offset, buffer, 0,
bufferSize);
                            // loop read input stream until it does not
fill whole size of buffer
                            while (result == bufferSize) {
                                out.write(buffer);
                                // read next segment from input stream by
moving the offset pointer
                                offset += bufferSize;
                                result = backup.read(offset, buffer, 0,
bufferSize);
                            }

                            if (result > 0 && result < bufferSize) {
                                for (int i = 0; i < result; i++) {
                                    out.write(buffer[i]);
                                }
                            }
                            out.writeBytes("Cluster: "+entry._1+", Point:
"+entry._2.toString()+"\n");
                            out.close();
                        }
                        else {
                            BufferedWriter bw =new BufferedWriter(new
OutputStreamWriter(fs.create(pt)));
                            bw.write("Cluster: "+entry._1+", Point:
"+entry._2.toString()+"\n");
                            bw.close();
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }

                public void backup(Configuration conf, FileSystem
fs,FSDataInputStream sourceContent, Path pt_temp) throws Exception {

                    FSDataOutputStream out = fs.create(pt_temp, true);
                    IOUtils.copyBytes(sourceContent, out, 4096, false);
                    out.close();
                }

where is my fault?? or give it a function to write(append) to the hadoop
hdfs?

best regards,
paul

Reply via email to