Hi All,
I am seeing exception when trying to substract 2 rdds. Lets say rdd1 has messages like - * pnr, bookingId, BookingObject* 101, 1, BookingObject1 // - event number is 0 102, 1, BookingObject2 // - event number is 0 103, 2, BookingObject3 //-event number is 1 rdd1 looks like RDD1[(String,Int,Booking)]. Booking table in Cassandra has primary key as pnr and bookingId. Lets say Booking table has following rows- * pnr, bookingId, eventNumber* Row1 - 101, 1, 1 Row2 - 103, 2, 0 RDD1.joinWithCassandraTable on columns pnr and bookingId with Booking table is giving me the following CassandraJoinRDD - (101, 1, BookingObject1), Row1 (103, 2, BookingObject3), Row2 Now on this rdd, I am comparing event number of BookinObject against eventNumber in the row and filter the messages whose eventNUmber is greater than that of in the row - which gives the following Rdd val RDD2:RDD[(String,Int,BookingObject), CassandraRow] contains the below record (102, 2, BookingObject3), Row2. But I also need pnr 102 from the original rdd as it is not existing in DB. Hence to get such messages - I am CassandraJoinRDD from original RDD i.e RDD1 as val mappedCRdd= CassandraJoinRDD.map{case(tuple, row) => tuple} subtractedRdd= RDD1.subtract(mappedCRdd) val mappedRdd2 = RDD2.map{case(tuple, row) => tuple} Now I am doing union this subtractedRdd with mappedRdd2 as subtractedRdd.union(mappedRdd2 ) But subtract on Rdd is throwing below exception - java.lang.NullPointerException at org.joda.time.LocalDateTime.getValue(LocalDateTime.java:566) at org.joda.time.base.AbstractPartial.hashCode(AbstractPartial.java:282) at scala.runtime.ScalaRunTime$.hash(ScalaRunTime.scala:210) at scala.util.hashing.MurmurHash3.productHash(MurmurHash3.scala:63) at scala.util.hashing.MurmurHash3$.productHash(MurmurHash3.scala:210) at scala.runtime.ScalaRunTime$._hashCode(ScalaRunTime.scala:172) at com.amadeus.ti.models.tof.TOFModel$GCAS.hashCode(TOFModel.scala:14) at scala.runtime.ScalaRunTime$.hash(ScalaRunTime.scala:210) at scala.util.hashing.MurmurHash3.productHash(MurmurHash3.scala:63) at scala.util.hashing.MurmurHash3$.productHash(MurmurHash3.scala:210) at scala.runtime.ScalaRunTime$._hashCode(ScalaRunTime.scala:172) at com.amadeus.ti.models.tof.TOFModel$TAOFRS.hashCode(TOFModel.scala:7) at java.util.HashMap.hash(HashMap.java:362) at java.util.HashMap.put(HashMap.java:492) at org.apache.spark.rdd.SubtractedRDD.org <http://org.apache.spark.rdd.subtractedrdd.org/>$apache$spark$rdd$SubtractedRDD$$getSeq$1(SubtractedRDD.scala:104) at org.apache.spark.rdd.SubtractedRDD$$anonfun$compute$1.apply(SubtractedRDD.scala:119) at org.apache.spark.rdd.SubtractedRDD$$anonfun$compute$1.apply(SubtractedRDD.scala:119) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.rdd.SubtractedRDD.integrate$1(SubtractedRDD.scala:116) at org.apache.spark.rdd.SubtractedRDD.compute(SubtractedRDD.scala:119) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) If subtract if the problem, what is other way i could achieve this or is it something I am doing wrong? Thanks, Padma Ch