Hi, I am observing some weird behavior with spark, it might be my mis-interpretation of some fundamental concepts but I have look at it for 3 days and have not been able to solve it.
The source code is pretty long and complex so instead of posting it, I will try to articulate the problem. I am building a "Sentiment Analyser" using the Naive Bayes model in Spark. 1) I have taken text files in RAW format and created a RDD of words->Array(files the word is found in). 2) From this I have derived the "features" array for each file which is an Array[Double], a 0.0 if the file does not contain the word and 1.0 if the word is found in the file 3) I have then created an RDD[LabeledPoints] from this I have created the Naive Baiyes model using the following code val splits = uberLbRDD.randomSplit(Array(0.6, 0.4), seed = 11L) val training = splits(0) // training.persist(StorageLevel.MEMORY_AND_DISK_SER_2) val test = splits(1) Logger.info("Training count: " + training.count() + " Testing count:" + test.count()) model = NaiveBayes.train(training, lambda = 1.0) val predictionAndLabel = test.map(p => (model.predict(p.features), p.label)) val accuracy = 1.0 * predictionAndLabel.filter(x => x._1 == x._2).count() / test.count() Logger.info("Fold:[" + fold + "] accuracy: [" + accuracy +"]") 4) The model seems to be fine and the accuracy is about 75% to 82% depending on which set of input fles I provide. 5) Now I am using this model to "predict()", here I am creating the same feature array from the input text file and I have code as follows, /* * Print all the features (words) in the feature array */ allFeaturesRDD.foreach((x) => print(x + ", ")) /* * Build the feature array */ val features = buildFeatureArray(reviewID,wordSeqRdd) <---- Fails here, have show this code below logFeatureArray(features) val prediction = model.predict(Vectors.dense(features)) Logger.info ("Prediction:" + prediction) ================================== reviewID ----> filename wordReviewSeqRDD -> RDD[(word, Array(filename)] def buildFeatureArray(reviewID:String, wordReviewSeqRDD:RDD[(String,Seq[String])]): Array[Double] = { val numWords = allFeaturesRDD.count <--- number of all words in the feature val wordReviewSeqMap = wordReviewSeqRDD.collectAsMap() var featArray:Array[Double] = new Array(numWords.toInt) <--- create an empty features array var idx = 0 if (trainingDone) Logger.info("Feature len:" + numWords) allFeaturesRDD.map{ *<-- This is where it is failing, * case(eachword) => { *<-- for some reason the code does not enter here ????* val reviewList = wordReviewSeqMap.get(eachword).get if (trainingDone == true) { println("1. eachword:" + eachword + "reviewList:" + reviewList) println("2. reviewList.size:" + reviewList.length) println("3. reviewList(0):" + reviewList(0)) } featArray(idx) = if (reviewList.contains(reviewID)) 1.toDouble else 0.toDouble idx += 1 } } featArray } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-does-not-loop-through-a-RDD-map-tp21102.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org