bq. aggregationMap.put(countryCode,requestCountPerCountry+1); If NPE came from the above line, maybe requestCountPerCountry was null ?
Cheers On Thu, Aug 6, 2015 at 8:54 AM, UMESH CHAUDHARY <umesh9...@gmail.com> wrote: > Scenario is: > > - I have a map of country-code as key and count as value (initially > count is 0) > - In DStream.foreachRDD I need to update the count for country in the > map with new aggregated value > > I am doing : > > transient Map<String,Integer> aggregationMap=new > ConcurrentHashMap<String,Integer>(); > > > Integer requestCountPerCountry=aggregationMap.get(countryCode); > > aggregationMap.put(countryCode,requestCountPerCountry+1); // Getting > Following Error in this Line > > > java.lang.NullPointerException > at JavaKafkaStream$2.call(JavaKafkaStream.java:107) > at JavaKafkaStream$2.call(JavaKafkaStream.java:92) > at > org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:323) > at > org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:323) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40) > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:40) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) > at scala.util.Try$.apply(Try.scala:161) > at org.apache.spark.streaming.scheduler.Job.run(Job.scala:34) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:193) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:193) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:193) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:192) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > > > Is this issue related to : > > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > > If so how can I resolve this? > >