Hi All,
I am seeing exception when trying to substract 2 rdds.
Lets say rdd1 has messages like -
* pnr, bookingId, BookingObject*
101, 1, BookingObject1 // - event number is 0
102, 1, BookingObject2 // - event number is 0
103, 2, BookingObject3 //-event number is 1
rdd1 looks like RDD1[(String,Int,Booking)].
Booking table in Cassandra has primary key as pnr and bookingId.
Lets say Booking table has following rows-
* pnr, bookingId, eventNumber*
Row1 - 101, 1, 1
Row2 - 103, 2, 0
RDD1.joinWithCassandraTable on columns pnr and bookingId with Booking table
is giving me the following CassandraJoinRDD -
(101, 1, BookingObject1), Row1
(103, 2, BookingObject3), Row2
Now on this rdd, I am comparing event number of BookinObject against
eventNumber in the row and filter the messages whose eventNUmber is greater
than that of in the row - which gives the following Rdd
val RDD2:RDD[(String,Int,BookingObject), CassandraRow] contains the below
record
(102, 2, BookingObject3), Row2.
But I also need pnr 102 from the original rdd as it is not existing in DB.
Hence to get such messages - I am CassandraJoinRDD from original RDD i.e
RDD1 as
val mappedCRdd= CassandraJoinRDD.map{case(tuple, row) => tuple}
subtractedRdd= RDD1.subtract(mappedCRdd)
val mappedRdd2 = RDD2.map{case(tuple, row) => tuple}
Now I am doing union this subtractedRdd with mappedRdd2 as
subtractedRdd.union(mappedRdd2 )
But subtract on Rdd is throwing below exception -
java.lang.NullPointerException
at org.joda.time.LocalDateTime.getValue(LocalDateTime.java:566)
at org.joda.time.base.AbstractPartial.hashCode(AbstractPartial.java:282)
at scala.runtime.ScalaRunTime$.hash(ScalaRunTime.scala:210)
at scala.util.hashing.MurmurHash3.productHash(MurmurHash3.scala:63)
at scala.util.hashing.MurmurHash3$.productHash(MurmurHash3.scala:210)
at scala.runtime.ScalaRunTime$._hashCode(ScalaRunTime.scala:172)
at com.amadeus.ti.models.tof.TOFModel$GCAS.hashCode(TOFModel.scala:14)
at scala.runtime.ScalaRunTime$.hash(ScalaRunTime.scala:210)
at scala.util.hashing.MurmurHash3.productHash(MurmurHash3.scala:63)
at scala.util.hashing.MurmurHash3$.productHash(MurmurHash3.scala:210)
at scala.runtime.ScalaRunTime$._hashCode(ScalaRunTime.scala:172)
at com.amadeus.ti.models.tof.TOFModel$TAOFRS.hashCode(TOFModel.scala:7)
at java.util.HashMap.hash(HashMap.java:362)
at java.util.HashMap.put(HashMap.java:492)
at org.apache.spark.rdd.SubtractedRDD.org
<http://org.apache.spark.rdd.subtractedrdd.org/>$apache$spark$rdd$SubtractedRDD$$getSeq$1(SubtractedRDD.scala:104)
at
org.apache.spark.rdd.SubtractedRDD$$anonfun$compute$1.apply(SubtractedRDD.scala:119)
at
org.apache.spark.rdd.SubtractedRDD$$anonfun$compute$1.apply(SubtractedRDD.scala:119)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
org.apache.spark.rdd.SubtractedRDD.integrate$1(SubtractedRDD.scala:116)
at org.apache.spark.rdd.SubtractedRDD.compute(SubtractedRDD.scala:119)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
If subtract if the problem, what is other way i could achieve this or is
it something I am doing wrong?
Thanks,
Padma Ch