That IntRef problem is very strange, as it's not related to running a spark
job, but rather just interpreting the code in the repl. There are two
possibilities I can think of:
- Spark was compiled with a different version of Scala than you're running
it on. Spark is compiled on Scala 2.10 from Spark 0.9.0 onward, so make
sure that's what you're using.
- You have two different versions of Scala on the classpath of your repl.

Regarding the NotSerializableException, it's exactly what it says. I
suspect that "jc_" is either a com.wcohen.ss.Jaccard or references a
Jaccard, so this line requires that it be serialized:

    destrdd.map(x => jc_.score(str1, new BasicStringWrapper(x)))

You have a few ways around this, in ascending order of complication:
1. Instantiate your Jaccard with Serializable, like this:

    val mjc = new Jaccard() with Serializable

This may actually work because Jaccard actually has a zero-arg constructor.

2. Create a new Jaccard in each closure:

    destrdd.map(x => new Jaccard().score(str1, new BasicStringWrapper(x)))

In general, if this were an expensive operation, you could amortize it by
using mapPartitions:

    destrdd.mapPartitions { part =>
      val jc = new Jaccard()
      part.map(jc.score(str1, new BasicStringWrapper(x)))
    }

3. Use Kryo instead of Java serialization, and register a custom Serializer
for Jaccard. This is a good general solution since Kryo is faster than
Java, but may cause further complications. See
http://spark.apache.org/docs/latest/tuning.html#data-serialization.


On Tue, Jun 24, 2014 at 4:36 PM, Sameer Tilak <ssti...@live.com> wrote:

> Dear Aaron,
> Thanks for your help. I am still facing few problems.
>
> I am using a 3rd party library (jar file) under the hood when I call
> jc_->score. Each call to jc_->score will generate a array of doubles. It
> is basically score of the current sentence with every sentence in the
> destrdd generated by loading a file from hdfs. My goal is to then save it
> in a file and repeat it for every sentence/string from sourcerdd.
>
>
>   def calculateScore (sourcerdd: RDD[String], destrdd: RDD[String])  {
>   val jc_ = this.mjc
>   var i : Int = 0
>
>   for (sentence <- sourcerdd.toLocalIterator)
>    {
>         val str1 = new StringWrapper (sentence)
> var scorevector = destrdd.map(x => jc_.score(str1, new
> BasicStringWrapper(x)))
>         val fileName = new
> String("/apps/software/scala-approsstrmatch-sentence" + i)
>         scorevector.saveAsTextFile(fileName)
>        i += 1
>    }
>
>   }
> I get this error message. I am not sure why var i : Int = 0 will throw an
> error?
>
> java.lang.NoSuchMethodError:
> scala.runtime.IntRef.create(I)Lscala/runtime/IntRef;
>  at
> approxstrmatch.JaccardScore.calculateScoreSecond(JaccardScore.scala:77)
> at $iwC$$iwC$$iwC$$iwC.<init>(<console>:19)
>  at $iwC$$iwC$$iwC.<init>(<console>:24)
> at $iwC$$iwC.<init>(<console>:26)
> at $iwC.<init>(<console>:28)
>  at <init>(<console>:30)
> at .<init>(<console>:34)
> at .<clinit>(<console>)
>  at .<init>(<console>:7)
> at .<clinit>(<console>)
> at $print(<console>)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>  at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:601)
>  at
> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:788)
> at
> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1056)
>  at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:614)
> at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:645)
>  at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:609)
> at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:796)
>  at
> org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:841)
> at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:753)
>  at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:601)
> at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:608)
>  at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:611)
> at
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:936)
>  at
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884)
> at
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884)
>  at
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
> at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:884)
>  at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:982)
> at org.apache.spark.repl.Main$.main(Main.scala:31)
>  at org.apache.spark.repl.Main.main(Main.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:601)
> at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292)
>  at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> If I get rid of i from the above code, I still get the following error. I
> incorporated your feedback and there is no call to any method as a part of
> closure now.
>
> Error message:
>
> 14/06/24 11:24:57 INFO DAGScheduler: Submitting Stage 1 (MappedRDD[5] at
> saveAsTextFile at JaccardScore.scala:64), which has no missing parents
> 14/06/24 11:24:57 INFO DAGScheduler: Failed to run saveAsTextFile at
> JaccardScore.scala:64
> org.apache.spark.SparkException: Job aborted due to stage failure: Task
> not serializable: java.io.NotSerializableException: com.wcohen.ss.Jaccard
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
>  at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
>  at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>  at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770)
>  at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713)
>  at
> org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176)
>  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>  at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
> ------------------------------
> From: ilike...@gmail.com
> Date: Mon, 23 Jun 2014 18:00:27 -0700
> Subject: Re: DAGScheduler: Failed to run foreach
> To: user@spark.apache.org
> CC: u...@spark.incubator.apache.org
>
>
> Please note that this:
>
> for (sentence <- sourcerdd) {
>    ...
> }
>
> is actually Scala syntactic sugar which is converted into
>
> sourcerdd.foreach { sentence => ... }
>
> What this means is that this will actually run on the cluster, which is
> probably not what you want if you're trying to print them.
>
> Try this instead:
>
> for (sentence <- sourcerdd.toLocalIterator) {
>   ...
> }
>
> (By the way, the reason this was throwing a NotSerializableException was
> because you were trying to pass "printScoreCanndedString" as part of the
> job's closure. In Java, class methods have an implicit reference to "this",
> so it tried to serialize the class CalculateScore, which is presumably not
> marked as Serializable.)
>
>
> On Mon, Jun 23, 2014 at 5:45 PM, Sameer Tilak <ssti...@live.com> wrote:
>
> The subject should be: org.apache.spark.SparkException: Job aborted due
> to stage failure: Task not serializable: java.io.NotSerializableException:
>  and not DAGScheduler: Failed to run foreach
>
> If I call printScoreCanndedString with a hard-coded string and identical
> 2nd parameter, it works fine. However for my application that is not
> sufficient.
> ------------------------------
> From: ssti...@live.com
> To: u...@spark.incubator.apache.org
> Subject: DAGScheduler: Failed to run foreach
> Date: Mon, 23 Jun 2014 17:05:03 -0700
>
>
> Hi All,
>
> I am using spark for text analysis. I have a source file that has few
> thousand sentences and a dataset of tens of millions of statements. I want
> to compare each statement from the sourceFile with each statement from the
> dataset and generate a score. I am having following problem. I would really
> appreciate help.
>
> Here is what I do within spark-shell
>
> // Source file with few thousand sentences
>  val srcFile = sc.textFile("hdfs://serverip/data/dummy/src.txt");
>
> // Dataset with tens of millions of statements.
>
>  val destFile = sc.textFile("hdfs://serverip/data/dummy/sample.txt");
>
> // Initialize the score variable.
>
>  val score = new mypackage.Score()
>
> // Generate score.
>
> score.calculateScore(srcFile, destFile);
>
> Here is my snippet from my scala class (Score.scala)
>
>
>   def calculateScore (sourcerdd: RDD[String], destrdd: RDD[String])  {
>     for (sentence <- sourcerdd)
>    {
>       println("Source String is: " + sentence + "Data Type is: " +
> sentence.getClass.getSimpleName)
>
>       printScoreCanndedString(sentence, destrdd);
>    }
>
>   }
>     def printScoreCanndedString(sourceStr: String,rdd: RDD[String]) :
> RDD[Double] = {
>     // Do the analysis here.
>     }
>
> The print statement displays the data correctly along with data type as
> String as expected. However, I get the following error message when it
> tries to execute  printScoreCanndedString method. Any help with this will
> be great.
>
>
> 14/06/23 16:45:04 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 14/06/23 16:45:04 WARN LoadSnappy: Snappy native library not loaded
> 14/06/23 16:45:04 INFO FileInputFormat: Total input paths to process : 1
> 14/06/23 16:45:04 INFO SparkContext: Starting job: foreach at
> calculateScore.scala:51
> 14/06/23 16:45:04 INFO DAGScheduler: Got job 0 (foreach at
> CalculateScore.scala:51) with 2 output partitions (allowLocal=false)
> 14/06/23 16:45:04 INFO DAGScheduler: Final stage: Stage 0(foreach
> at CalculateScore.scala:51)
> 14/06/23 16:45:04 INFO DAGScheduler: Parents of final stage: List()
> 14/06/23 16:45:04 INFO DAGScheduler: Missing parents: List()
> 14/06/23 16:45:04 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[1] at
> textFile at <console>:12), which has no missing parents
> 14/06/23 16:45:04 INFO DAGScheduler: Failed to run foreach
> at CalculateScore.scala:51
> org.apache.spark.SparkException: Job aborted due to stage failure: Task
> not serializable: java.io.NotSerializableException: approxstrmatch.
> CalculateScore
>  at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
>  at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
>  at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>  at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770)
>  at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713)
>  at
> org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176)
>  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>  at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>

Reply via email to