Hi folks, I'm trying to write a simple Spark job that dumps out a Kafka
queue into HDFS. Being very new to Kafka, not sure if I'm messing something
up on that side...My hope is to read the messages presently in the queue
(or at least the first 100 for now)

Here is what I have:
Kafka side:

 ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic ingress
--broker-list IP1:9092,IP2:9092,IP3:9092 --time -1
ingress:0:34386
ingress:1:34148
ingress:2:34300

​

On Spark side I'm trying this(1.4.1):

bin/spark-shell --jars
kafka-clients-0.8.2.0.jar,spark-streaming-kafka_2.10-1.4.1.jar,kafka_2.10-0.8.2.0.jar,metrics-core-2.2.0.ja



val brokers="IP1:9092,IP2:9092,IP3:9092" //same as IPs above
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)

val offsetRange= (0 to 2).map(part=>OffsetRange.create("ingress",part,0,100))
val messages= KafkaUtils.createRDD(sc,kafkaParams,offsetRange.toArray)
messages: org.apache.spark.rdd.RDD[(Nothing, Nothing)] = KafkaRDD[1]
at RDD at KafkaRDD.scala:45

​

when I try messages.count I get:

15/09/22 14:01:17 ERROR TaskContextImpl: Error in TaskCompletionListener
java.lang.NullPointerException
        at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(KafkaRDD.scala:157)
        at 
org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63)
        at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:100)
        at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:100)
        at 
org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:56)
        at 
org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:75)
        at 
org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:73)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at 
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:73)
        at org.apache.spark.scheduler.Task.run(Task.scala:72)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

Reply via email to