I have finish training MatrixFactorizationModel, I want to load this model in
spark-streaming. 
I think it can be works, but actually not. I don't know why, who can help
me?

I wrote code like this:

    val ssc = new StreamingContext ...
    val bestModel = MatrixFactorizationModel.load(ssc.sparkContext,
"hdfs:///recommendmodel")
    
    val model = ssc.sparkContext.broadcast(bestModel)

    val kafkaStream = KafkaUtils.createStream ...
    val data = kafkaStream.map ...
    
    data.foreachRDD(rdd => {
        val userId = rdd ...
        model.value.recommendProducts(userId, 10)    // Throws exception in
this line.
    })

It throws exception when I run this Job:

org.apache.spark.SparkException: RDD transformations and actions can only be
invoked by the driver, not inside of other transformations; for example,
rdd1.map(x => rdd2.values.count() * x) is invalid because the values
transformation and count action cannot be performed inside of the rdd1.map
transformation. For more information, see SPARK-5063.
        at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
        at 
org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:873)
        at
org.apache.spark.mllib.recommendation.MatrixFactorizationModel.recommendProducts(MatrixFactorizationModel.scala:119)
        at
com.jobs.stream.RecommendStories$$anonfun$main$2$$anonfun$apply$1.apply(RecommendStories.scala:127)
        at
com.jobs.stream.RecommendStories$$anonfun$main$2$$anonfun$apply$1.apply(RecommendStories.scala:114)
        at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
        at
com.jobs.stream.RecommendStories$$anonfun$main$2.apply(RecommendStories.scala:114)
        at
com.jobs.stream.RecommendStories$$anonfun$main$2.apply(RecommendStories.scala:105)
        at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)
        at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)
        at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42)
        at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
        at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
        at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
        at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:40)
        at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
        at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
        at scala.util.Try$.apply(Try.scala:161)
        at org.apache.spark.streaming.scheduler.Job.run(Job.scala:34)
        at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:193)
        at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:193)
        at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:193)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:192)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Can-it-works-in-load-the-MatrixFactorizationModel-and-predict-product-with-Spark-Streaming-tp23362.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to