Sure, the code is very simple. I think u guys can understand from the main
function.
public class Test1 {
public static double[][] createBroadcastPoints(String localPointPath,
int
row, int col) throws IOException{
BufferedReader br = RAWF.reader(localPointPath);
String line = null;
int rowIndex = 0;
double[][] pointFeatures = new double[row][col];
while((line = br.readLine())!=null){
List<String> point = Arrays.asList(line.split(","));
int colIndex = 0;
for(String pointFeature: point){
pointFeatures[rowIndex][colIndex] =
Double.valueOf(pointFeature);
colIndex++;
}
rowIndex++;
}
br.close();
return pointFeatures;
}
public static void main(String[] args) throws IOException{
/**Parameter Setting***********/
String localPointPath =
"/home/hduser/skyrock/skyrockImageFeatures.csv";
String remoteFilePath =
"hdfs://HadoopV26Master:9000/user/skyrock/skyrockImageIndexedFeatures.csv";
//this csv file is only 468MB
final int row = 133433;
final int col = 458;
/******************/
SparkConf conf = new SparkConf().
setAppName("distance").
setMaster("spark://HadoopV26Master:7077").
set("spark.executor.memory", "4g").
set("spark.eventLog.enabled", "true")
.set("spark.eventLog.dir",
"/usr/local/spark/logs/spark-events")
.set("spark.local.dir", "/tmp/spark-temp");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> textFile = sc.textFile(remoteFilePath);
//Broadcast variable
//double[][] xx =;
final Broadcast<double[][]> broadcastPoints =
sc.broadcast(createBroadcastPoints(localPointPath,row,col));
//final Broadcast<double[][]> broadcastPoints =
sc.broadcast(xx);
/**
* Compute the distance in terms of each point on each instance.
* distance list: index = n(i-1)- i*(i-1)/2 + j-i-1
*/
JavaPairRDD<Pair,Double> distance = textFile.flatMapToPair(new
PairFlatMapFunction<String, Pair, Double>(){
public Iterable<Tuple2<Pair, Double>> call(String
v1) throws
Exception{
List<String> al = Arrays.asList(v1.split(","));
double[] featureVals = new double[al.size()];
for(int j=0;j<al.size()-1;j++)
featureVals[j] =
Double.valueOf(al.get(j+1));
int jIndex = Integer.valueOf(al.get(0));
double[][] allPoints =
broadcastPoints.getValue();
double sum = 0;
List<Tuple2<Pair, Double>> list = new
ArrayList<Tuple2<Pair,
Double>>();
for(int i=0;i<row; i++){
sum = 0;
for(int j=0;j<al.size()-1;j++){
sum +=
(allPoints[i][j]-featureVals[j])*(allPoints[i][j]-featureVals[j]);
}
list.add(new Tuple2<Pair,Double>(new
Pair(i,jIndex),Math.sqrt(sum)));
}
return list;
}
});
distance.saveAsTextFile("hdfs://HadoopV26Master:9000/user/"+args[0]);
}
}
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/java-program-got-Stuck-at-broadcasting-tp22953p22973.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]