Thank you for replying, Ted, I have been debuging and the getLeaderOffsets method is not appending errors because the method findLeaders that is called at the first line of getLeaderOffsets is not returning leaders.
Cody, the topics do not have any messages yet. Could this be an issue?? If you guys want to have a look at the code I've just uploaded it to my github account: big-brother <https://github.com/ardlema/big-brother> (see DirectKafkaWordCountTest.scala). Thank you again!! 2015-03-19 22:13 GMT+01:00 Cody Koeninger <c...@koeninger.org>: > What is the value of your topics variable, and does it correspond to > topics that already exist on the cluster and have messages in them? > > On Thu, Mar 19, 2015 at 3:10 PM, Ted Yu <yuzhih...@gmail.com> wrote: > >> Looking at KafkaCluster#getLeaderOffsets(): >> >> respMap.get(tp).foreach { por: PartitionOffsetsResponse => >> if (por.error == ErrorMapping.NoError) { >> ... >> } else { >> errs.append(ErrorMapping.exceptionFor(por.error)) >> } >> There should be some error other than "Couldn't find leader offsets for >> Set()" >> >> Can you check again ? >> >> Thanks >> >> On Thu, Mar 19, 2015 at 12:10 PM, Alberto Rodriguez <ardl...@gmail.com> >> wrote: >> >> > Hi all, >> > >> > I am trying to make the new kafka and spark streaming integration work >> > (direct >> > approach "no receivers" >> > <http://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html>). >> I >> > have created an unit test where I configure and start both zookeeper and >> > kafka. >> > >> > When I try to create the InputDStream using the createDirectStream >> method >> > of the KafkaUtils class I am getting the following error: >> > >> > org.apache.spark.SparkException:* Couldn't find leader offsets for >> Set()* >> > org.apache.spark.SparkException: org.apache.spark.SparkException: >> Couldn't >> > find leader offsets for Set() >> > at >> > >> > >> org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createDirectStream$2.apply(KafkaUtils.scala:413) >> > >> > Following is the code that tries to create the DStream: >> > >> > val messages: InputDStream[(String, String)] = >> > KafkaUtils.createDirectStream[String, String, StringDecoder, >> > StringDecoder]( >> > ssc, kafkaParams, topics) >> > >> > Does anyone faced this problem? >> > >> > Thank you in advance. >> > >> > Kind regards, >> > >> > Alberto >> > >> > >