Trying to run following causes a NullPointer Exception. While I thought Spark should have been able to handle Null, apparently it is not able to. What could I return in place of null? What other ways could I approach this?? There are at times, I would want to just skip parsing and proceed to next record, how do I handle that?
filtered.mapToPair(new Parse(imprDetailMarshal)).reduceByKey(new TimeAndCount()).foreachRDD(new ImpressionDetailLogPub(indexNamePrefix,indexType, imprDetailMarshal)); } * Where Parse is:* private static class Parse implements PairFunction<String, ImpressionDetailRecord, TimeNcount> { private static final long serialVersionUID = -5060508551208900848L; private static final DateTimeFormatter FORMATTER = DateTimeFormat.forPattern("dd/MMM/yyyy:HH:mm:ss ZZ"); private final ImprDetailMarshal imprDetailMarshal; public Parse(ImprDetailMarshal imprDetailMarshal){ this.imprDetailMarshal = imprDetailMarshal; } @Override public Tuple2<ImpressionDetailRecord, TimeNcount> call(String arg0) throws Exception { ImpressionDetailRecordHolder recordHolder = imprDetailMarshal.parse(arg0); if(recordHolder != null) { return new Tuple2<ImpressionDetailRecord, TimeNcount>(recordHolder.getImpressionDetailRecord(),recordHolder.getTimeCount()); } return null; } } java.lang.NullPointerException at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) -- Thanks, Saurabh :)