Trying to run following causes a NullPointer Exception. While I thought
Spark should have been able to handle Null, apparently it is not able to.
What could I return in place of null? What other ways could I approach
this?? There are at times, I would want to just skip parsing and proceed to
next record, how do I handle that?


filtered.mapToPair(new Parse(imprDetailMarshal)).reduceByKey(new
TimeAndCount()).foreachRDD(new
ImpressionDetailLogPub(indexNamePrefix,indexType, imprDetailMarshal));
    }



*   Where Parse is:*

    private static class Parse implements PairFunction<String,
ImpressionDetailRecord, TimeNcount> {

        private static final long serialVersionUID = -5060508551208900848L;
        private static final DateTimeFormatter FORMATTER =
DateTimeFormat.forPattern("dd/MMM/yyyy:HH:mm:ss ZZ");
        private final ImprDetailMarshal imprDetailMarshal;

        public Parse(ImprDetailMarshal imprDetailMarshal){
        this.imprDetailMarshal = imprDetailMarshal;
        }

        @Override
        public Tuple2<ImpressionDetailRecord, TimeNcount> call(String arg0)
throws Exception {

          ImpressionDetailRecordHolder recordHolder =
imprDetailMarshal.parse(arg0);
            if(recordHolder != null)
            {
            return new Tuple2<ImpressionDetailRecord,
TimeNcount>(recordHolder.getImpressionDetailRecord(),recordHolder.getTimeCount());
            }
           return null;
        }
    }

​java.lang.NullPointerException
        at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
        at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)​



-- 
Thanks,
Saurabh

:)

Reply via email to