Hi,

I’m attempting to do a left outer join in Spark, and I’m getting an NPE
that appears to be due to some Spark Java API bug. (I’m running Spark 1.6.0
in local mode on a Mac).

For a little background, the left outer join returns all keys from the left
side of the join regardless of whether or not the key is present on the
right side.  To handle this uncertainty, the value from the right side is
wrapped in Guava’s Optional class.  The Optional class has a method to
check whether the value is present or not (which would indicate the key
appeared in both RDDs being joined).  If the key was indeed present in both
RDDs you can then retrieve the value and move forward.

After doing a little digging, I found that Spark is using Scala’s Option
functionality internally.  This is the same concept as the Guava Optional,
only native to Scala.  It appears that during the conversion from a Scala
Option back to a Guava Optional (this method can be found here:
https://github.com/apache/spark/blob/v1.6.0/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala#L28)
the
conversion method is erroneously passed a Scala Option with the String
value “None” instead of Scala’s null value None.  This is matched to the
first *case*, which causes Guava’s Optional.of method to attempt to pull
the value out.  A NPE is thrown since it wasn’t ever actually there.

The code basically looks like this, where the classes used are just plain
Java objects with some class attributes inside:
// First RDD
JavaPairRDD<GroupItemNode, WeekItemComposite> rdd1
// Second RDD
JavaPairRDD<GroupItemNode, Inventory> rdd2

// Resultant RDD
JavaPairRDD<GroupItemNode, Tuple2<WeekItemComposite, Optional<Inventory>>>
result = rdd1.leftOuterJoin(rdd2)

Has anyone ever encountered this problem before, or know why the
optionToOptional method might be getting passed this “None” value?  I’ve
added some more relevant information below, let me know if I can provide
any more details.

Here's a screenshot showing the string value of “None” being passed into
the optionToOptional method using the debugger:

Here’s the stack trace (the method shown above is highlighted):

ERROR 13:17:00,743 com.tgt.allocation.needengine.NeedEngineApplication
Exception while running need engine:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 8
in stage 31.0 failed 1 times, most recent failure: Lost task 8.0 in stage
31.0 (TID 50, localhost): java.lang.NullPointerException
at
org.spark-project.guava.base.Preconditions.checkNotNull(Preconditions.java:191)
at com.google.common.base.Optional.of(Optional.java:86)
at org.apache.spark.api.java.JavaUtils$.optionToOptional(JavaUtils.scala:30)
at
org.apache.spark.api.java.JavaPairRDD$$anonfun$leftOuterJoin$2.apply(JavaPairRDD.scala:564)
at
org.apache.spark.api.java.JavaPairRDD$$anonfun$leftOuterJoin$2.apply(JavaPairRDD.scala:564)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:755)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:755)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
<http://org.apache.spark.scheduler.dagscheduler.org/>
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
at org.apache.spark.rdd.RDD.count(RDD.scala:1143)
at org.apache.spark.api.java.JavaRDDLike$class.count(JavaRDDLike.scala:440)
at org.apache.spark.api.java.AbstractJavaRDDLike.count(JavaRDDLike.scala:46)
at
com.tgt.allocation.needengine.spark.processor.NeedEngineProcessor.runProcessor(NeedEngineProcessor.java:43)
at
com.tgt.allocation.needengine.spark.processor.SparkProcessor.runProcessor(SparkProcessor.java:68)
at
com.tgt.allocation.needengine.service.NeedEngineService.runProcessor(NeedEngineService.java:47)
at
com.tgt.allocation.needengine.NeedEngineApplication.main(NeedEngineApplication.java:29)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: java.lang.NullPointerException
at
org.spark-project.guava.base.Preconditions.checkNotNull(Preconditions.java:191)
at com.google.common.base.Optional.of(Optional.java:86)
at org.apache.spark.api.java.JavaUtils$.optionToOptional(JavaUtils.scala:30)
at
org.apache.spark.api.java.JavaPairRDD$$anonfun$leftOuterJoin$2.apply(JavaPairRDD.scala:564)
at
org.apache.spark.api.java.JavaPairRDD$$anonfun$leftOuterJoin$2.apply(JavaPairRDD.scala:564)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:755)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:755)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
 WARN 13:17:00,744 org.apache.spark.Logging$class Lost task 9.0 in stage
31.0 (TID 51, localhost): TaskKilled (killed intentionally)

Thank you for any help you may be able to provide,

Adam Westerman

Reply via email to