.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647) at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
The above is also part of the error. On Mon, Aug 3, 2015 at 6:40 PM, Ted Yu <yuzhih...@gmail.com> wrote: > Putting your code in a file I find the following on line 17: > stepAcc = new StepAccumulator(); > However I don't think that was where the NPE was thrown. > > Another thing I don't understand was that there were two addAccumulator() > calls at the top of stack trace while in your code I don't > see addAccumulator() calling itself. > > FYI > > On Mon, Aug 3, 2015 at 3:22 PM, Anubhav Agarwal <anubha...@gmail.com> > wrote: > >> The code was written in 1.4 but I am compiling it and running it with 1.3. >> >> import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap; >> import org.apache.spark.AccumulableParam; >> import scala.Tuple4; >> import thomsonreuters.trailblazer.operation.DriverCalc; >> import thomsonreuters.trailblazer.operation.StepAccumulator; >> >> //Tuple4<Allocation StepIndex.IF_Position, DenKey, NumKey, Value> - >> Allocation Step Add >> >> class DriverAccumulator implements >> AccumulableParam<Object2ObjectOpenHashMap<String, StepAccumulator>, >> Tuple4<String, String, String, Double>> { >> private static final Object _lockObj = new Object(); >> >> public Object2ObjectOpenHashMap<String, StepAccumulator> >> addAccumulator(Object2ObjectOpenHashMap<String, StepAccumulator> >> stepAccumulatorMap, Tuple4<String, String, String, Double> value) { >> if (value == null) return stepAccumulatorMap; >> synchronized (_lockObj) { >> StepAccumulator stepAcc = stepAccumulatorMap.get(value._1()); >> if (stepAcc == null) { >> stepAcc = new StepAccumulator(); >> stepAccumulatorMap.put(value._1(), stepAcc); >> } >> DriverCalc dc = stepAcc.stepRows.get(value._2()); >> if (dc == null) { >> dc = new DriverCalc(); >> dc._denominator = value._4(); >> if (value._3() != null) dc._numerator.put(value._3(), >> value._4()); >> stepAcc.stepRows.put(value._2(), dc); >> } else { >> dc._denominator = dc._denominator + value._4(); >> if (value._3() != null) { >> Double val = dc._numerator.get(value._3()); >> dc._numerator.put(value._3(), new Double(val != null >> ? val + value._4() : value._4())); >> } >> } >> } >> return stepAccumulatorMap; >> } >> >> public Object2ObjectOpenHashMap<String, StepAccumulator> >> addInPlace(Object2ObjectOpenHashMap<String, StepAccumulator> r1, >> Object2ObjectOpenHashMap<String, StepAccumulator> r2) { >> r2.forEach((k,v) -> r1.merge(k, v, this::mergeAcc)); >> return r1; >> } >> >> private StepAccumulator mergeAcc(StepAccumulator source1, >> StepAccumulator source2) { >> source2.stepRows.forEach((k,v) -> source1.stepRows.merge(k, v, >> this::denominatorMerge)); >> return source1; >> } >> >> private DriverCalc denominatorMerge(DriverCalc driverCalc1, >> DriverCalc driverCalc2) { >> driverCalc1._denominator = driverCalc1._denominator + >> driverCalc2._denominator; >> driverCalc2._numerator.forEach((k,v) -> >> driverCalc1._numerator.merge(k, v, this::numeratorMerge)); >> return driverCalc1; >> } >> >> private Double numeratorMerge(Double d1, Double d2) { >> return d1 + d2; >> } >> >> public Object2ObjectOpenHashMap<String, StepAccumulator> >> zero(Object2ObjectOpenHashMap<String, StepAccumulator> initialValue) { >> return null; >> } >> >> } >> >> On Mon, Aug 3, 2015 at 6:20 PM, Ted Yu <yuzhih...@gmail.com> wrote: >> >>> Can you show related code in DriverAccumulator.java ? >>> >>> Which Spark release do you use ? >>> >>> Cheers >>> >>> On Mon, Aug 3, 2015 at 3:13 PM, Anubhav Agarwal <anubha...@gmail.com> >>> wrote: >>> >>>> Hi, >>>> I am trying to modify my code to use HDFS and multiple nodes. The code >>>> works fine when I run it locally in a single machine with a single worker. >>>> I have been trying to modify it and I get the following error. Any hint >>>> would be helpful. >>>> >>>> java.lang.NullPointerException >>>> at >>>> thomsonreuters.trailblazer.main.DriverAccumulator.addAccumulator(DriverAccumulator.java:17) >>>> at >>>> thomsonreuters.trailblazer.main.DriverAccumulator.addAccumulator(DriverAccumulator.java:11) >>>> at org.apache.spark.Accumulable.add(Accumulators.scala:73) >>>> at >>>> thomsonreuters.trailblazer.main.AllocationBolt.queueDriverRow(AllocationBolt.java:112) >>>> at >>>> thomsonreuters.trailblazer.main.AllocationBolt.executeRow(AllocationBolt.java:303) >>>> at >>>> thomsonreuters.trailblazer.main.FileMapFunction.call(FileMapFunction.java:49) >>>> at >>>> thomsonreuters.trailblazer.main.FileMapFunction.call(FileMapFunction.java:8) >>>> at >>>> org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction2$1.apply(JavaPairRDD.scala:996) >>>> at >>>> org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:90) >>>> at >>>> org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:90) >>>> at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647) >>>> at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647) >>>> at >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >>>> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) >>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) >>>> at org.apache.spark.scheduler.Task.run(Task.scala:64) >>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>> at java.lang.Thread.run(Thread.java:745) >>>> >>>> failed in write bolt execute null >>>> failed in write bolt execute null >>>> java.lang.NullPointerException >>>> >>>> >>> >> >