Hi, I’m attempting to do a left outer join in Spark, and I’m getting an NPE that appears to be due to some Spark Java API bug. (I’m running Spark 1.6.0 in local mode on a Mac).
For a little background, the left outer join returns all keys from the left side of the join regardless of whether or not the key is present on the right side. To handle this uncertainty, the value from the right side is wrapped in Guava’s Optional class. The Optional class has a method to check whether the value is present or not (which would indicate the key appeared in both RDDs being joined). If the key was indeed present in both RDDs you can then retrieve the value and move forward. After doing a little digging, I found that Spark is using Scala’s Option functionality internally. This is the same concept as the Guava Optional, only native to Scala. It appears that during the conversion from a Scala Option back to a Guava Optional (this method can be found here: https://github.com/apache/spark/blob/v1.6.0/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala#L28) the conversion method is erroneously passed a Scala Option with the String value “None” instead of Scala’s null value None. This is matched to the first *case*, which causes Guava’s Optional.of method to attempt to pull the value out. A NPE is thrown since it wasn’t ever actually there. The code basically looks like this, where the classes used are just plain Java objects with some class attributes inside: // First RDD JavaPairRDD<GroupItemNode, WeekItemComposite> rdd1 // Second RDD JavaPairRDD<GroupItemNode, Inventory> rdd2 // Resultant RDD JavaPairRDD<GroupItemNode, Tuple2<WeekItemComposite, Optional<Inventory>>> result = rdd1.leftOuterJoin(rdd2) Has anyone ever encountered this problem before, or know why the optionToOptional method might be getting passed this “None” value? I’ve added some more relevant information below, let me know if I can provide any more details. Here's a screenshot showing the string value of “None” being passed into the optionToOptional method using the debugger: Here’s the stack trace (the method shown above is highlighted): ERROR 13:17:00,743 com.tgt.allocation.needengine.NeedEngineApplication Exception while running need engine: org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 in stage 31.0 failed 1 times, most recent failure: Lost task 8.0 in stage 31.0 (TID 50, localhost): java.lang.NullPointerException at org.spark-project.guava.base.Preconditions.checkNotNull(Preconditions.java:191) at com.google.common.base.Optional.of(Optional.java:86) at org.apache.spark.api.java.JavaUtils$.optionToOptional(JavaUtils.scala:30) at org.apache.spark.api.java.JavaPairRDD$$anonfun$leftOuterJoin$2.apply(JavaPairRDD.scala:564) at org.apache.spark.api.java.JavaPairRDD$$anonfun$leftOuterJoin$2.apply(JavaPairRDD.scala:564) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:755) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:755) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org <http://org.apache.spark.scheduler.dagscheduler.org/> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) at org.apache.spark.rdd.RDD.count(RDD.scala:1143) at org.apache.spark.api.java.JavaRDDLike$class.count(JavaRDDLike.scala:440) at org.apache.spark.api.java.AbstractJavaRDDLike.count(JavaRDDLike.scala:46) at com.tgt.allocation.needengine.spark.processor.NeedEngineProcessor.runProcessor(NeedEngineProcessor.java:43) at com.tgt.allocation.needengine.spark.processor.SparkProcessor.runProcessor(SparkProcessor.java:68) at com.tgt.allocation.needengine.service.NeedEngineService.runProcessor(NeedEngineService.java:47) at com.tgt.allocation.needengine.NeedEngineApplication.main(NeedEngineApplication.java:29) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) Caused by: java.lang.NullPointerException at org.spark-project.guava.base.Preconditions.checkNotNull(Preconditions.java:191) at com.google.common.base.Optional.of(Optional.java:86) at org.apache.spark.api.java.JavaUtils$.optionToOptional(JavaUtils.scala:30) at org.apache.spark.api.java.JavaPairRDD$$anonfun$leftOuterJoin$2.apply(JavaPairRDD.scala:564) at org.apache.spark.api.java.JavaPairRDD$$anonfun$leftOuterJoin$2.apply(JavaPairRDD.scala:564) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:755) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:755) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) WARN 13:17:00,744 org.apache.spark.Logging$class Lost task 9.0 in stage 31.0 (TID 51, localhost): TaskKilled (killed intentionally) Thank you for any help you may be able to provide, Adam Westerman