Hi folks, I'm trying to write a simple Spark job that dumps out a Kafka queue into HDFS. Being very new to Kafka, not sure if I'm messing something up on that side...My hope is to read the messages presently in the queue (or at least the first 100 for now)
Here is what I have: Kafka side: ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic ingress --broker-list IP1:9092,IP2:9092,IP3:9092 --time -1 ingress:0:34386 ingress:1:34148 ingress:2:34300 On Spark side I'm trying this(1.4.1): bin/spark-shell --jars kafka-clients-0.8.2.0.jar,spark-streaming-kafka_2.10-1.4.1.jar,kafka_2.10-0.8.2.0.jar,metrics-core-2.2.0.ja val brokers="IP1:9092,IP2:9092,IP3:9092" //same as IPs above val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) val offsetRange= (0 to 2).map(part=>OffsetRange.create("ingress",part,0,100)) val messages= KafkaUtils.createRDD(sc,kafkaParams,offsetRange.toArray) messages: org.apache.spark.rdd.RDD[(Nothing, Nothing)] = KafkaRDD[1] at RDD at KafkaRDD.scala:45 when I try messages.count I get: 15/09/22 14:01:17 ERROR TaskContextImpl: Error in TaskCompletionListener java.lang.NullPointerException at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(KafkaRDD.scala:157) at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:100) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:100) at org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:56) at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:75) at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:73) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:73) at org.apache.spark.scheduler.Task.run(Task.scala:72) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)