Can you try commenting the saveAsTextFile and do a simple count()? If its a
broadcast issue, then it would throw up the same error.
On 21 May 2015 14:21, "allanjie" <[email protected]> wrote:
> Sure, the code is very simple. I think u guys can understand from the main
> function.
>
> public class Test1 {
>
> public static double[][] createBroadcastPoints(String
> localPointPath, int
> row, int col) throws IOException{
> BufferedReader br = RAWF.reader(localPointPath);
> String line = null;
> int rowIndex = 0;
> double[][] pointFeatures = new double[row][col];
> while((line = br.readLine())!=null){
> List<String> point =
> Arrays.asList(line.split(","));
> int colIndex = 0;
> for(String pointFeature: point){
> pointFeatures[rowIndex][colIndex] =
> Double.valueOf(pointFeature);
> colIndex++;
> }
> rowIndex++;
> }
> br.close();
> return pointFeatures;
> }
>
>
>
> public static void main(String[] args) throws IOException{
> /**Parameter Setting***********/
> String localPointPath =
> "/home/hduser/skyrock/skyrockImageFeatures.csv";
> String remoteFilePath =
> "hdfs://HadoopV26Master:9000/user/skyrock/skyrockImageIndexedFeatures.csv";
> //this csv file is only 468MB
> final int row = 133433;
> final int col = 458;
> /******************/
>
> SparkConf conf = new SparkConf().
> setAppName("distance").
> setMaster("spark://HadoopV26Master:7077").
> set("spark.executor.memory", "4g").
> set("spark.eventLog.enabled", "true")
> .set("spark.eventLog.dir",
> "/usr/local/spark/logs/spark-events")
> .set("spark.local.dir", "/tmp/spark-temp");
> JavaSparkContext sc = new JavaSparkContext(conf);
>
> JavaRDD<String> textFile = sc.textFile(remoteFilePath);
> //Broadcast variable
> //double[][] xx =;
>
> final Broadcast<double[][]> broadcastPoints =
> sc.broadcast(createBroadcastPoints(localPointPath,row,col));
> //final Broadcast<double[][]> broadcastPoints =
> sc.broadcast(xx);
>
> /**
> * Compute the distance in terms of each point on each
> instance.
> * distance list: index = n(i-1)- i*(i-1)/2 + j-i-1
> */
> JavaPairRDD<Pair,Double> distance =
> textFile.flatMapToPair(new
> PairFlatMapFunction<String, Pair, Double>(){
> public Iterable<Tuple2<Pair, Double>>
> call(String v1) throws
> Exception{
> List<String> al =
> Arrays.asList(v1.split(","));
> double[] featureVals = new
> double[al.size()];
> for(int j=0;j<al.size()-1;j++)
> featureVals[j] =
> Double.valueOf(al.get(j+1));
> int jIndex = Integer.valueOf(al.get(0));
> double[][] allPoints =
> broadcastPoints.getValue();
> double sum = 0;
> List<Tuple2<Pair, Double>> list =
> new ArrayList<Tuple2<Pair,
> Double>>();
> for(int i=0;i<row; i++){
> sum = 0;
> for(int j=0;j<al.size()-1;j++){
> sum +=
> (allPoints[i][j]-featureVals[j])*(allPoints[i][j]-featureVals[j]);
> }
> list.add(new
> Tuple2<Pair,Double>(new
> Pair(i,jIndex),Math.sqrt(sum)));
> }
> return list;
> }
> });
>
>
>
> distance.saveAsTextFile("hdfs://HadoopV26Master:9000/user/"+args[0]);
> }
> }
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/java-program-got-Stuck-at-broadcasting-tp22953p22973.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [email protected]
> For additional commands, e-mail: [email protected]
>
>