Hi All,

 I am seeing exception when trying to substract 2 rdds.

 Lets say rdd1 has messages like -

*  pnr,  bookingId,  BookingObject*
 101,   1,               BookingObject1     // - event number is 0
 102,   1,               BookingObject2    // - event number is 0
 103,   2,               BookingObject3    //-event number is  1

rdd1 looks like RDD1[(String,Int,Booking)].

Booking table in Cassandra has primary key as pnr and bookingId.
Lets say Booking table has following rows-

*            pnr,  bookingId, eventNumber*
Row1 -  101,   1,              1
Row2 -  103,   2,              0

RDD1.joinWithCassandraTable on columns pnr and bookingId with Booking table
is giving me the following CassandraJoinRDD -

(101, 1, BookingObject1), Row1
(103, 2, BookingObject3), Row2

Now on this rdd, I am comparing event number of BookinObject against
eventNumber in the row and filter the messages whose eventNUmber is greater
than that of in the row - which gives the following Rdd

val RDD2:RDD[(String,Int,BookingObject), CassandraRow] contains the below
record

(102, 2, BookingObject3), Row2.

But I also need pnr 102 from the original rdd as it is not existing in DB.
Hence to get such messages - I am CassandraJoinRDD from original RDD i.e
RDD1 as

val mappedCRdd= CassandraJoinRDD.map{case(tuple, row) => tuple}
subtractedRdd= RDD1.subtract(mappedCRdd)



val mappedRdd2 = RDD2.map{case(tuple, row) => tuple}

Now I am doing union this subtractedRdd with mappedRdd2 as
subtractedRdd.union(mappedRdd2 )

But subtract on Rdd is throwing below exception -


java.lang.NullPointerException
        at org.joda.time.LocalDateTime.getValue(LocalDateTime.java:566)
        at org.joda.time.base.AbstractPartial.hashCode(AbstractPartial.java:282)
        at scala.runtime.ScalaRunTime$.hash(ScalaRunTime.scala:210)
        at scala.util.hashing.MurmurHash3.productHash(MurmurHash3.scala:63)
        at scala.util.hashing.MurmurHash3$.productHash(MurmurHash3.scala:210)
        at scala.runtime.ScalaRunTime$._hashCode(ScalaRunTime.scala:172)
        at com.amadeus.ti.models.tof.TOFModel$GCAS.hashCode(TOFModel.scala:14)
        at scala.runtime.ScalaRunTime$.hash(ScalaRunTime.scala:210)
        at scala.util.hashing.MurmurHash3.productHash(MurmurHash3.scala:63)
        at scala.util.hashing.MurmurHash3$.productHash(MurmurHash3.scala:210)
        at scala.runtime.ScalaRunTime$._hashCode(ScalaRunTime.scala:172)
        at com.amadeus.ti.models.tof.TOFModel$TAOFRS.hashCode(TOFModel.scala:7)
        at java.util.HashMap.hash(HashMap.java:362)
        at java.util.HashMap.put(HashMap.java:492)
        at org.apache.spark.rdd.SubtractedRDD.org
<http://org.apache.spark.rdd.subtractedrdd.org/>$apache$spark$rdd$SubtractedRDD$$getSeq$1(SubtractedRDD.scala:104)
        at 
org.apache.spark.rdd.SubtractedRDD$$anonfun$compute$1.apply(SubtractedRDD.scala:119)
        at 
org.apache.spark.rdd.SubtractedRDD$$anonfun$compute$1.apply(SubtractedRDD.scala:119)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at 
org.apache.spark.rdd.SubtractedRDD.integrate$1(SubtractedRDD.scala:116)
        at org.apache.spark.rdd.SubtractedRDD.compute(SubtractedRDD.scala:119)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
        at org.apache.spark.scheduler.Task.run(Task.scala:64)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

If subtract if the problem, what  is other way i could achieve this or is
it something  I am doing wrong?

Thanks,
Padma Ch

Reply via email to