here i wrote a simpler version of the code to get an understanding of how it
works:

final List<NeuralNet> nns = new ArrayList<NeuralNet>(); 
for(int i = 0; i < numberOfNets; i++){ 
nns.add(NeuralNet.createFrom(...)); 
} 

final JavaRDD<NeuralNet> nnRdd = sc.parallelize(nns);   
JavaDStream<Float> results = rndLists.flatMap(new
FlatMapFunction<Map&lt;String,Object>, Float>() { 
@Override 
public Iterable<Float> call(Map<String, Object> input) 
throws Exception { 

Float f = nnRdd.map(new Function<NeuralNet, Float>() { 

@Override 
public Float call(NeuralNet nn) throws Exception { 

return 1.0f; 
} 
}).reduce(new Function2<Float, Float, Float>() { 

@Override 
public Float call(Float left, Float right) throws Exception { 

return left + right; 
} 
}); 

return Arrays.asList(f); 
} 
}); 
results.print();


This works as expected and print() simply shows the number of neural nets i
have
If instead a print() i use

results.foreach(new Function<JavaRDD&lt;Float>, Void>() { 

@Override 
public Void call(JavaRDD<Float> arg0) throws Exception { 


for(Float f : arg0.collect()){ 
System.out.println(f); 
} 
return null; 
} 
});

It fails with the following exception
org.apache.spark.SparkException: Job aborted due to stage failure: Task
1.0:0 failed 1 times, most recent failure: Exception failure in TID 1 on
host localhost: java.lang.NullPointerException 
org.apache.spark.rdd.RDD.map(RDD.scala:270)

This is weird to me since the same code executes as expected in one case and
doesn't in the other, any idea what's going on here ?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Odd-error-when-using-a-rdd-map-within-a-stream-map-tp14551.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to