Dear Aaron,Thanks for your help. I am still facing few problems.
I am using a 3rd party library (jar file) under the hood when I call
jc_->score. Each call to jc_->score will generate a array of doubles. It is
basically score of the current sentence with every sentence in the destrdd
generated by loading a file from hdfs. My goal is to then save it in a file and
repeat it for every sentence/string from sourcerdd.
def calculateScore (sourcerdd: RDD[String], destrdd: RDD[String]) { val jc_
= this.mjc var i : Int = 0
for (sentence <- sourcerdd.toLocalIterator) { val str1 = new
StringWrapper (sentence) var scorevector = destrdd.map(x =>
jc_.score(str1, new BasicStringWrapper(x))) val fileName = new
String("/apps/software/scala-approsstrmatch-sentence" + i)
scorevector.saveAsTextFile(fileName) i += 1 }
}I get this error message. I am not sure why var i : Int = 0 will throw an
error?
java.lang.NoSuchMethodError:
scala.runtime.IntRef.create(I)Lscala/runtime/IntRef; at
approxstrmatch.JaccardScore.calculateScoreSecond(JaccardScore.scala:77) at
$iwC$$iwC$$iwC$$iwC.<init>(<console>:19) at
$iwC$$iwC$$iwC.<init>(<console>:24) at $iwC$$iwC.<init>(<console>:26) at
$iwC.<init>(<console>:28) at <init>(<console>:30) at .<init>(<console>:34)
at .<clinit>(<console>) at .<init>(<console>:7) at .<clinit>(<console>) at
$print(<console>) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601) at
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:788) at
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1056) at
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:614) at
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:645) at
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:609) at
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:796) at
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:841) at
org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:753) at
org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:601) at
org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:608) at
org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:611) at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:936)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884)
at
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:884)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:982) at
org.apache.spark.repl.Main$.main(Main.scala:31) at
org.apache.spark.repl.Main.main(Main.scala) at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601) at
org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292) at
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) at
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
If I get rid of i from the above code, I still get the following error. I
incorporated your feedback and there is no call to any method as a part of
closure now.
Error message:
14/06/24 11:24:57 INFO DAGScheduler: Submitting Stage 1 (MappedRDD[5] at
saveAsTextFile at JaccardScore.scala:64), which has no missing parents14/06/24
11:24:57 INFO DAGScheduler: Failed to run saveAsTextFile at
JaccardScore.scala:64org.apache.spark.SparkException: Job aborted due to stage
failure: Task not serializable: java.io.NotSerializableException:
com.wcohen.ss.Jaccard at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713)
at
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at
akka.actor.ActorCell.invoke(ActorCell.scala:456) at
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at
akka.dispatch.Mailbox.run(Mailbox.scala:219) at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
From: [email protected]
Date: Mon, 23 Jun 2014 18:00:27 -0700
Subject: Re: DAGScheduler: Failed to run foreach
To: [email protected]
CC: [email protected]
Please note that this:
for (sentence <- sourcerdd) { ... }
is actually Scala syntactic sugar which is converted into
sourcerdd.foreach { sentence => ... }
What this means is that this will actually run on the cluster, which is
probably not what you want if you're trying to print them.
Try this instead:
for (sentence <- sourcerdd.toLocalIterator) { ...}
(By the way, the reason this was throwing a NotSerializableException was
because you were trying to pass "printScoreCanndedString" as part of the job's
closure. In Java, class methods have an implicit reference to "this", so it
tried to serialize the class CalculateScore, which is presumably not marked as
Serializable.)
On Mon, Jun 23, 2014 at 5:45 PM, Sameer Tilak <[email protected]> wrote:
The subject should be: org.apache.spark.SparkException: Job aborted due to
stage failure: Task not serializable: java.io.NotSerializableException: and
not DAGScheduler: Failed to run foreach
If I call printScoreCanndedString with a hard-coded string and identical 2nd
parameter, it works fine. However for my application that is not sufficient.
From: [email protected]
To: [email protected]
Subject: DAGScheduler: Failed to run foreach
Date: Mon, 23 Jun 2014 17:05:03 -0700
Hi All,
I am using spark for text analysis. I have a source file that has few thousand
sentences and a dataset of tens of millions of statements. I want to compare
each statement from the sourceFile with each statement from the dataset and
generate a score. I am having following problem. I would really appreciate help.
Here is what I do within spark-shell
// Source file with few thousand sentences val srcFile =
sc.textFile("hdfs://serverip/data/dummy/src.txt");
// Dataset with tens of millions of statements.
val destFile = sc.textFile("hdfs://serverip/data/dummy/sample.txt");
// Initialize the score variable.
val score = new mypackage.Score()
// Generate score.
score.calculateScore(srcFile, destFile);
Here is my snippet from my scala class (Score.scala)
def calculateScore (sourcerdd: RDD[String], destrdd: RDD[String]) {
for (sentence <- sourcerdd) { println("Source String is: " +
sentence + "Data Type is: " + sentence.getClass.getSimpleName)
printScoreCanndedString(sentence, destrdd);
}
} def printScoreCanndedString(sourceStr: String,rdd: RDD[String]) :
RDD[Double] = { // Do the analysis here. }
The print statement displays the data correctly along with data type as String
as expected. However, I get the following error message when it tries to
execute printScoreCanndedString method. Any help with this will be great.
14/06/23 16:45:04 WARN NativeCodeLoader: Unable to load native-hadoop library
for your platform... using builtin-java classes where applicable
14/06/23 16:45:04 WARN LoadSnappy: Snappy native library not loaded14/06/23
16:45:04 INFO FileInputFormat: Total input paths to process : 114/06/23
16:45:04 INFO SparkContext: Starting job: foreach at calculateScore.scala:51
14/06/23 16:45:04 INFO DAGScheduler: Got job 0 (foreach at
CalculateScore.scala:51) with 2 output partitions (allowLocal=false)14/06/23
16:45:04 INFO DAGScheduler: Final stage: Stage 0(foreach at
CalculateScore.scala:51)
14/06/23 16:45:04 INFO DAGScheduler: Parents of final stage: List()14/06/23
16:45:04 INFO DAGScheduler: Missing parents: List()14/06/23 16:45:04 INFO
DAGScheduler: Submitting Stage 0 (MappedRDD[1] at textFile at <console>:12),
which has no missing parents
14/06/23 16:45:04 INFO DAGScheduler: Failed to run foreach at
CalculateScore.scala:51org.apache.spark.SparkException: Job aborted due to
stage failure: Task not serializable: java.io.NotSerializableException:
approxstrmatch.CalculateScore
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713)
at
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at
akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at
akka.dispatch.Mailbox.run(Mailbox.scala:219) at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)