The logs have told you what cause the error that you can not invoke RDD
transformations and actions in other transformations. You have not do this
explicitly but the implementation of MatrixFactorizationModel
.recommendProducts
do that, you can refer
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala#L119

2015-06-17 16:26 GMT+08:00 wanbo <gewa...@163.com>:

> I have finish training MatrixFactorizationModel, I want to load this model
> in
> spark-streaming.
> I think it can be works, but actually not. I don't know why, who can help
> me?
>
> I wrote code like this:
>
>     val ssc = new StreamingContext ...
>     val bestModel = MatrixFactorizationModel.load(ssc.sparkContext,
> "hdfs:///recommendmodel")
>
>     val model = ssc.sparkContext.broadcast(bestModel)
>
>     val kafkaStream = KafkaUtils.createStream ...
>     val data = kafkaStream.map ...
>
>     data.foreachRDD(rdd => {
>         val userId = rdd ...
>         model.value.recommendProducts(userId, 10)    // Throws exception in
> this line.
>     })
>
> It throws exception when I run this Job:
>
> org.apache.spark.SparkException: RDD transformations and actions can only
> be
> invoked by the driver, not inside of other transformations; for example,
> rdd1.map(x => rdd2.values.count() * x) is invalid because the values
> transformation and count action cannot be performed inside of the rdd1.map
> transformation. For more information, see SPARK-5063.
>         at org.apache.spark.rdd.RDD.org
> $apache$spark$rdd$RDD$$sc(RDD.scala:87)
>         at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
>         at
> org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:873)
>         at
>
> org.apache.spark.mllib.recommendation.MatrixFactorizationModel.recommendProducts(MatrixFactorizationModel.scala:119)
>         at
>
> com.jobs.stream.RecommendStories$$anonfun$main$2$$anonfun$apply$1.apply(RecommendStories.scala:127)
>         at
>
> com.jobs.stream.RecommendStories$$anonfun$main$2$$anonfun$apply$1.apply(RecommendStories.scala:114)
>         at
>
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>         at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>         at
>
> com.jobs.stream.RecommendStories$$anonfun$main$2.apply(RecommendStories.scala:114)
>         at
>
> com.jobs.stream.RecommendStories$$anonfun$main$2.apply(RecommendStories.scala:105)
>         at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)
>         at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)
>         at
>
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42)
>         at
>
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
>         at
>
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
>         at
>
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
>         at
>
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:40)
>         at
>
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>         at
>
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>         at scala.util.Try$.apply(Try.scala:161)
>         at org.apache.spark.streaming.scheduler.Job.run(Job.scala:34)
>         at
>
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:193)
>         at
>
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:193)
>         at
>
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:193)
>         at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>         at
>
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:192)
>         at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:724)
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Can-it-works-in-load-the-MatrixFactorizationModel-and-predict-product-with-Spark-Streaming-tp23362.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to