That IntRef problem is very strange, as it's not related to running a spark job, but rather just interpreting the code in the repl. There are two possibilities I can think of: - Spark was compiled with a different version of Scala than you're running it on. Spark is compiled on Scala 2.10 from Spark 0.9.0 onward, so make sure that's what you're using. - You have two different versions of Scala on the classpath of your repl.
Regarding the NotSerializableException, it's exactly what it says. I suspect that "jc_" is either a com.wcohen.ss.Jaccard or references a Jaccard, so this line requires that it be serialized: destrdd.map(x => jc_.score(str1, new BasicStringWrapper(x))) You have a few ways around this, in ascending order of complication: 1. Instantiate your Jaccard with Serializable, like this: val mjc = new Jaccard() with Serializable This may actually work because Jaccard actually has a zero-arg constructor. 2. Create a new Jaccard in each closure: destrdd.map(x => new Jaccard().score(str1, new BasicStringWrapper(x))) In general, if this were an expensive operation, you could amortize it by using mapPartitions: destrdd.mapPartitions { part => val jc = new Jaccard() part.map(jc.score(str1, new BasicStringWrapper(x))) } 3. Use Kryo instead of Java serialization, and register a custom Serializer for Jaccard. This is a good general solution since Kryo is faster than Java, but may cause further complications. See http://spark.apache.org/docs/latest/tuning.html#data-serialization. On Tue, Jun 24, 2014 at 4:36 PM, Sameer Tilak <ssti...@live.com> wrote: > Dear Aaron, > Thanks for your help. I am still facing few problems. > > I am using a 3rd party library (jar file) under the hood when I call > jc_->score. Each call to jc_->score will generate a array of doubles. It > is basically score of the current sentence with every sentence in the > destrdd generated by loading a file from hdfs. My goal is to then save it > in a file and repeat it for every sentence/string from sourcerdd. > > > def calculateScore (sourcerdd: RDD[String], destrdd: RDD[String]) { > val jc_ = this.mjc > var i : Int = 0 > > for (sentence <- sourcerdd.toLocalIterator) > { > val str1 = new StringWrapper (sentence) > var scorevector = destrdd.map(x => jc_.score(str1, new > BasicStringWrapper(x))) > val fileName = new > String("/apps/software/scala-approsstrmatch-sentence" + i) > scorevector.saveAsTextFile(fileName) > i += 1 > } > > } > I get this error message. I am not sure why var i : Int = 0 will throw an > error? > > java.lang.NoSuchMethodError: > scala.runtime.IntRef.create(I)Lscala/runtime/IntRef; > at > approxstrmatch.JaccardScore.calculateScoreSecond(JaccardScore.scala:77) > at $iwC$$iwC$$iwC$$iwC.<init>(<console>:19) > at $iwC$$iwC$$iwC.<init>(<console>:24) > at $iwC$$iwC.<init>(<console>:26) > at $iwC.<init>(<console>:28) > at <init>(<console>:30) > at .<init>(<console>:34) > at .<clinit>(<console>) > at .<init>(<console>:7) > at .<clinit>(<console>) > at $print(<console>) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:601) > at > org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:788) > at > org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1056) > at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:614) > at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:645) > at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:609) > at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:796) > at > org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:841) > at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:753) > at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:601) > at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:608) > at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:611) > at > org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:936) > at > org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884) > at > org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884) > at > scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) > at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:884) > at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:982) > at org.apache.spark.repl.Main$.main(Main.scala:31) > at org.apache.spark.repl.Main.main(Main.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:601) > at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > > If I get rid of i from the above code, I still get the following error. I > incorporated your feedback and there is no call to any method as a part of > closure now. > > Error message: > > 14/06/24 11:24:57 INFO DAGScheduler: Submitting Stage 1 (MappedRDD[5] at > saveAsTextFile at JaccardScore.scala:64), which has no missing parents > 14/06/24 11:24:57 INFO DAGScheduler: Failed to run saveAsTextFile at > JaccardScore.scala:64 > org.apache.spark.SparkException: Job aborted due to stage failure: Task > not serializable: java.io.NotSerializableException: com.wcohen.ss.Jaccard > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770) > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713) > at > org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > > ------------------------------ > From: ilike...@gmail.com > Date: Mon, 23 Jun 2014 18:00:27 -0700 > Subject: Re: DAGScheduler: Failed to run foreach > To: user@spark.apache.org > CC: u...@spark.incubator.apache.org > > > Please note that this: > > for (sentence <- sourcerdd) { > ... > } > > is actually Scala syntactic sugar which is converted into > > sourcerdd.foreach { sentence => ... } > > What this means is that this will actually run on the cluster, which is > probably not what you want if you're trying to print them. > > Try this instead: > > for (sentence <- sourcerdd.toLocalIterator) { > ... > } > > (By the way, the reason this was throwing a NotSerializableException was > because you were trying to pass "printScoreCanndedString" as part of the > job's closure. In Java, class methods have an implicit reference to "this", > so it tried to serialize the class CalculateScore, which is presumably not > marked as Serializable.) > > > On Mon, Jun 23, 2014 at 5:45 PM, Sameer Tilak <ssti...@live.com> wrote: > > The subject should be: org.apache.spark.SparkException: Job aborted due > to stage failure: Task not serializable: java.io.NotSerializableException: > and not DAGScheduler: Failed to run foreach > > If I call printScoreCanndedString with a hard-coded string and identical > 2nd parameter, it works fine. However for my application that is not > sufficient. > ------------------------------ > From: ssti...@live.com > To: u...@spark.incubator.apache.org > Subject: DAGScheduler: Failed to run foreach > Date: Mon, 23 Jun 2014 17:05:03 -0700 > > > Hi All, > > I am using spark for text analysis. I have a source file that has few > thousand sentences and a dataset of tens of millions of statements. I want > to compare each statement from the sourceFile with each statement from the > dataset and generate a score. I am having following problem. I would really > appreciate help. > > Here is what I do within spark-shell > > // Source file with few thousand sentences > val srcFile = sc.textFile("hdfs://serverip/data/dummy/src.txt"); > > // Dataset with tens of millions of statements. > > val destFile = sc.textFile("hdfs://serverip/data/dummy/sample.txt"); > > // Initialize the score variable. > > val score = new mypackage.Score() > > // Generate score. > > score.calculateScore(srcFile, destFile); > > Here is my snippet from my scala class (Score.scala) > > > def calculateScore (sourcerdd: RDD[String], destrdd: RDD[String]) { > for (sentence <- sourcerdd) > { > println("Source String is: " + sentence + "Data Type is: " + > sentence.getClass.getSimpleName) > > printScoreCanndedString(sentence, destrdd); > } > > } > def printScoreCanndedString(sourceStr: String,rdd: RDD[String]) : > RDD[Double] = { > // Do the analysis here. > } > > The print statement displays the data correctly along with data type as > String as expected. However, I get the following error message when it > tries to execute printScoreCanndedString method. Any help with this will > be great. > > > 14/06/23 16:45:04 WARN NativeCodeLoader: Unable to load native-hadoop > library for your platform... using builtin-java classes where applicable > 14/06/23 16:45:04 WARN LoadSnappy: Snappy native library not loaded > 14/06/23 16:45:04 INFO FileInputFormat: Total input paths to process : 1 > 14/06/23 16:45:04 INFO SparkContext: Starting job: foreach at > calculateScore.scala:51 > 14/06/23 16:45:04 INFO DAGScheduler: Got job 0 (foreach at > CalculateScore.scala:51) with 2 output partitions (allowLocal=false) > 14/06/23 16:45:04 INFO DAGScheduler: Final stage: Stage 0(foreach > at CalculateScore.scala:51) > 14/06/23 16:45:04 INFO DAGScheduler: Parents of final stage: List() > 14/06/23 16:45:04 INFO DAGScheduler: Missing parents: List() > 14/06/23 16:45:04 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[1] at > textFile at <console>:12), which has no missing parents > 14/06/23 16:45:04 INFO DAGScheduler: Failed to run foreach > at CalculateScore.scala:51 > org.apache.spark.SparkException: Job aborted due to stage failure: Task > not serializable: java.io.NotSerializableException: approxstrmatch. > CalculateScore > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770) > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713) > at > org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > >