import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.Rating
// Load and parse the data
val data = sc.textFile("/path/CFReady.txt")
val ratings = data.map(_.split('\t') match { case Array(user, item, rate) =>
Rating(user.toInt, item.toInt, rate.toDouble)
})
// Build the recommendation model using ALS
val rank = 50
val numIterations = 100
val model = ALS.train(ratings, rank, numIterations, 0.10)
// Evaluate the model on rating data
val usersProducts = ratings.map { case Rating(user, product, rate) =>
(user, product)
}
val predictions =
model.predict(usersProducts).map { case Rating(user, product, rate) =>
((user, product), rate)
}
val ratesAndPreds = ratings.map { case Rating(user, product, rate) =>
((user, product), rate)
}.join(predictions)
val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) =>
val err = (r1 - r2)
err * err
}.mean()
println("Mean Squared Error = " + MSE)
val pw = new PrintWriter(new File("/path/CFOutput.txt"))
ratesAndPreds.foreach(pw.println)
}
Hi,
Consider the highlighted code, I am trying to write the output of ratesAndPreds
array on to the disk. But I get error
Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:758)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:36)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:41)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:43)
at $iwC$$iwC$$iwC.<init>(<console>:45)
at $iwC$$iwC.<init>(<console>:47)
at $iwC.<init>(<console>:49)
at <init>(<console>:51)
at .<init>(<console>:55)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
at
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
at
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
at
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
at
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
at
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814)
at
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859)
at
org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771)
at
org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)
at
org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
at
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at
org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902)
at
org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
at
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: java.io.PrintWriter
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 45 more
I am very new to Spark and Scala, can somebody please guide me here?
Thanks,
Saurabh
________________________________
This e-mail, including accompanying communications and attachments, is strictly
confidential and only for the intended recipient. Any retention, use or
disclosure not expressly authorised by Markit is prohibited. This email is
subject to all waivers and other terms at the following link:
http://www.markit.com/en/about/legal/email-disclaimer.page
Please visit http://www.markit.com/en/about/contact/contact-us.page? for
contact information on our offices worldwide.
MarkitSERV Limited has its registered office located at Level 4, Ropemaker
Place, 25 Ropemaker Street, London, EC2Y 9LY and is authorized and regulated by
the Financial Conduct Authority with registration number 207294