I have finish training MatrixFactorizationModel, I want to load this model in spark-streaming. I think it can be works, but actually not. I don't know why, who can help me?
I wrote code like this: val ssc = new StreamingContext ... val bestModel = MatrixFactorizationModel.load(ssc.sparkContext, "hdfs:///recommendmodel") val model = ssc.sparkContext.broadcast(bestModel) val kafkaStream = KafkaUtils.createStream ... val data = kafkaStream.map ... data.foreachRDD(rdd => { val userId = rdd ... model.value.recommendProducts(userId, 10) // Throws exception in this line. }) It throws exception when I run this Job: org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063. at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:873) at org.apache.spark.mllib.recommendation.MatrixFactorizationModel.recommendProducts(MatrixFactorizationModel.scala:119) at com.jobs.stream.RecommendStories$$anonfun$main$2$$anonfun$apply$1.apply(RecommendStories.scala:127) at com.jobs.stream.RecommendStories$$anonfun$main$2$$anonfun$apply$1.apply(RecommendStories.scala:114) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at com.jobs.stream.RecommendStories$$anonfun$main$2.apply(RecommendStories.scala:114) at com.jobs.stream.RecommendStories$$anonfun$main$2.apply(RecommendStories.scala:105) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:34) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:193) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:193) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:193) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:192) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-it-works-in-load-the-MatrixFactorizationModel-and-predict-product-with-Spark-Streaming-tp23362.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org