I am working on a scala code which performs Linear Regression on certain
datasets. Right now I am using 20 cores and 25 executors and everytime I run
a Spark job I get a different result.
The input size of the files are 2GB and 400 MB.However, when I run the job
with 20 cores and 1 executor, I get consistent results.
Has anyone experienced such a thing so far?
Please find the code below:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SchemaRDD
import org.apache.spark.Partitioner
import org.apache.spark.storage.StorageLevel
object TextProcess{
def main(args: Array[String]){
val conf = new SparkConf().set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val numExecutors=(conf.get("spark.executor.instances").toInt)
// Read the 2 input files
// First file is either cases / controls
val input1 = sc.textFile(args(0))
// Second file is Gene Expression
val input2 = sc.textFile(args(1))
//collecting header information
val header1=sc.parallelize(input1.take(1))
val header2=sc.parallelize(input2.take(1))
//mapping data without the header information
val map1 = input1.subtract(header1).map(x => (x.split("
")(0)+x.split(" ")(1), x))
val map2 = input2.subtract(header2).map(x => (x.split("
")(0)+x.split(" ")(1), x))
//joining data. here is where the order was getting affected.
val joinedMap = map1.join(map2)
//adding the header back to the top of RDD
val x = header1.union(joinedMap.map{case(x,(y,z))=>y})
val y = header2.union(joinedMap.map{case(x,(y,z))=>z})
//removing irrelevant columns
val rddX = x.map(x=>x.split("
").drop(3)).zipWithIndex.map{case(a,b)=> a.map(x=>b.toString+"
"+x.toString)}
val rddY = y.map(x=>x.split("
").drop(2)).zipWithIndex.map{case(a,b)=> a.map(x=>b.toString+"
"+x.toString)}
//transposing and cross joining data. This keeps the identifier
at the start
val transposedX = rddX.flatMap(x =>
x.zipWithIndex.map(x=>x.swap)).reduceByKey((a,b)=>
a+":"+b).map{case(a,b)=>b.split(":").sorted}
val transposedY = rddY.flatMap(x =>
x.zipWithIndex.map(x=>x.swap)).reduceByKey((a,b)=>
a+":"+b).map{case(a,b)=>b.split(":").sorted}.persist(StorageLevel.apply(false,
true, false, false, numExecutors))
val cleanedX =
transposedX.map(x=>x.map(x=>x.slice(x.indexOfSlice(" ")+1,x.length)))
val cleanedY =
transposedY.map(x=>x.map(x=>x.slice(x.indexOfSlice("
")+1,x.length))).persist(StorageLevel.apply(false, true, false, false,
numExecutors))
val cartXY = cleanedX.cartesian(cleanedY)
val finalDataSet= cartXY.map{case(a,b)=>a zip b}
//convert to key value pair
val regressiondataset =
finalDataSet.map(x=>(x(0),x.drop(1).filter{case(a,b)=> a!="NA" && b!="NA" &&
a!="null" && b!="null"}.map{case(a,b)=> (a.toDouble, b.toDouble)}))
val linearOutput = regressiondataset.map(s => new
LinearRegression(s._1 ,s._2).outputVal)
linearOutput.saveAsTextFile(args(2))
cleanedY.unpersist()
transposedY.unpersist()
}
}
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-returns-a-different-result-on-each-run-tp23861.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]