Exception using the new createDirectStream util method

2015-03-19 Thread Alberto Rodriguez
Hi all,

I am trying to make the new kafka and spark streaming integration work (direct
approach "no receivers"
). I
have created an unit test where I configure and start both zookeeper and
kafka.

When I try to create the InputDStream using the createDirectStream method
of the KafkaUtils class I am getting the following error:

org.apache.spark.SparkException:* Couldn't find leader offsets for Set()*
org.apache.spark.SparkException: org.apache.spark.SparkException: Couldn't
find leader offsets for Set()
at
org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createDirectStream$2.apply(KafkaUtils.scala:413)

Following is the code that tries to create the DStream:

val messages: InputDStream[(String, String)] =
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)

Does anyone faced this problem?

Thank you in advance.

Kind regards,

Alberto


Re: Exception using the new createDirectStream util method

2015-03-19 Thread Alberto Rodriguez
Thank you for replying,

Ted, I have been debuging and the getLeaderOffsets method is not appending
errors because the method findLeaders that is called at the first line of
getLeaderOffsets is not returning leaders.

Cody, the topics do not have any messages yet. Could this be an issue??

If you guys want to have a look at the code I've just uploaded it to my
github account: big-brother <https://github.com/ardlema/big-brother> (see
DirectKafkaWordCountTest.scala).

Thank you again!!

2015-03-19 22:13 GMT+01:00 Cody Koeninger :

> What is the value of your topics variable, and does it correspond to
> topics that already exist on the cluster and have messages in them?
>
> On Thu, Mar 19, 2015 at 3:10 PM, Ted Yu  wrote:
>
>> Looking at KafkaCluster#getLeaderOffsets():
>>
>>   respMap.get(tp).foreach { por: PartitionOffsetsResponse =>
>> if (por.error == ErrorMapping.NoError) {
>> ...
>> } else {
>>   errs.append(ErrorMapping.exceptionFor(por.error))
>> }
>> There should be some error other than "Couldn't find leader offsets for
>> Set()"
>>
>> Can you check again ?
>>
>> Thanks
>>
>> On Thu, Mar 19, 2015 at 12:10 PM, Alberto Rodriguez 
>> wrote:
>>
>> > Hi all,
>> >
>> > I am trying to make the new kafka and spark streaming integration work
>> > (direct
>> > approach "no receivers"
>> > <http://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html>).
>> I
>> > have created an unit test where I configure and start both zookeeper and
>> > kafka.
>> >
>> > When I try to create the InputDStream using the createDirectStream
>> method
>> > of the KafkaUtils class I am getting the following error:
>> >
>> > org.apache.spark.SparkException:* Couldn't find leader offsets for
>> Set()*
>> > org.apache.spark.SparkException: org.apache.spark.SparkException:
>> Couldn't
>> > find leader offsets for Set()
>> > at
>> >
>> >
>> org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createDirectStream$2.apply(KafkaUtils.scala:413)
>> >
>> > Following is the code that tries to create the DStream:
>> >
>> > val messages: InputDStream[(String, String)] =
>> > KafkaUtils.createDirectStream[String, String, StringDecoder,
>> > StringDecoder](
>> > ssc, kafkaParams, topics)
>> >
>> > Does anyone faced this problem?
>> >
>> > Thank you in advance.
>> >
>> > Kind regards,
>> >
>> > Alberto
>> >
>>
>
>


Re: Exception using the new createDirectStream util method

2015-03-20 Thread Alberto Rodriguez
You were absolutely right Cody!! I have just put a message in the kafka
topic before creating the DirectStream and now is working fine!

Do you think that I should open an issue to warn that the kafka topic must
contain at least one message before the DirectStream creation?

Thank you very much! You've just made my day ;)

2015-03-19 23:08 GMT+01:00 Cody Koeninger :

> Yeah, I wouldn't be shocked if Kafka's metadata apis didn't return results
> for topics that don't have any messages.  (sorry about the triple negative,
> but I think you get my meaning).
>
> Try putting a message in the topic and seeing what happens.
>
> On Thu, Mar 19, 2015 at 4:38 PM, Alberto Rodriguez 
> wrote:
>
>> Thank you for replying,
>>
>> Ted, I have been debuging and the getLeaderOffsets method is not appending
>> errors because the method findLeaders that is called at the first line of
>> getLeaderOffsets is not returning leaders.
>>
>> Cody, the topics do not have any messages yet. Could this be an issue??
>>
>> If you guys want to have a look at the code I've just uploaded it to my
>> github account: big-brother <https://github.com/ardlema/big-brother> (see
>>
>> DirectKafkaWordCountTest.scala).
>>
>> Thank you again!!
>>
>> 2015-03-19 22:13 GMT+01:00 Cody Koeninger :
>>
>> > What is the value of your topics variable, and does it correspond to
>> > topics that already exist on the cluster and have messages in them?
>> >
>> > On Thu, Mar 19, 2015 at 3:10 PM, Ted Yu  wrote:
>> >
>> >> Looking at KafkaCluster#getLeaderOffsets():
>> >>
>> >>   respMap.get(tp).foreach { por: PartitionOffsetsResponse =>
>> >> if (por.error == ErrorMapping.NoError) {
>> >> ...
>> >> } else {
>> >>   errs.append(ErrorMapping.exceptionFor(por.error))
>> >> }
>> >> There should be some error other than "Couldn't find leader offsets for
>> >> Set()"
>> >>
>> >> Can you check again ?
>> >>
>> >> Thanks
>> >>
>> >> On Thu, Mar 19, 2015 at 12:10 PM, Alberto Rodriguez > >
>> >> wrote:
>> >>
>> >> > Hi all,
>> >> >
>> >> > I am trying to make the new kafka and spark streaming integration
>> work
>> >> > (direct
>> >> > approach "no receivers"
>> >> > <http://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html
>> >).
>> >> I
>> >> > have created an unit test where I configure and start both zookeeper
>> and
>> >> > kafka.
>> >> >
>> >> > When I try to create the InputDStream using the createDirectStream
>> >> method
>> >> > of the KafkaUtils class I am getting the following error:
>> >> >
>> >> > org.apache.spark.SparkException:* Couldn't find leader offsets for
>> >> Set()*
>> >> > org.apache.spark.SparkException: org.apache.spark.SparkException:
>> >> Couldn't
>> >> > find leader offsets for Set()
>> >> > at
>> >> >
>> >> >
>> >>
>> org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createDirectStream$2.apply(KafkaUtils.scala:413)
>> >> >
>> >> > Following is the code that tries to create the DStream:
>> >> >
>> >> > val messages: InputDStream[(String, String)] =
>> >> > KafkaUtils.createDirectStream[String, String, StringDecoder,
>> >> > StringDecoder](
>> >> > ssc, kafkaParams, topics)
>> >> >
>> >> > Does anyone faced this problem?
>> >> >
>> >> > Thank you in advance.
>> >> >
>> >> > Kind regards,
>> >> >
>> >> > Alberto
>> >> >
>> >>
>> >
>> >
>>
>
>