How costly is Re balancing of partitions for a topic

2014-11-05 Thread dinesh kumar
Hello,

I am trying to come up with a design for consuming from Kafka.  *I am using
0.8.1.1 version of Kafka. *I am thinking of designing a system where the
consumer will be created every few seconds, consume the data from Kafka,
process it and then quits after committing the offsets to Kafka. At any
point of time expect 250 - 300 consumers to be active (running as
ThreadPools in different machines).

1. How and When a rebalance of partition happens?

2. How costly is the rebalancing of partitions among the consumers. I am
expecting a new consumer finishing up or joining every few seconds to the
same consumer group. So I just want to know the overhead and latency of a
rebalancing operation.

3. Say Consumer C1 has Partitions P1, P2, P3 assigned to it and it is
processing a message M1 from Partition P1. Now Consumer C2 joins the
group.  How is the partitions divided between C1 and C2. Is there a
possibility where C1's (which might take some time to commit its message to
Kafka) commit for M1 will be rejected and M1 will be treated as a fresh
message and will be delivered to someone else (I know Kafka is at least
once delivery model but wanted to confirm if the re partition by any chance
cause a re delivery of the same message)?


Thanks,
Dinesh


Re: Spark Kafka Performance

2014-11-05 Thread Eduardo Costa Alfaia
Hi Bhavesh

I will collect the  dump and I will send for you.

I am using a program that I have caught here  
https://github.com/edenhill/librdkafka/tree/master/examples 
 and I have 
changed to meet my tests. I have attached the files.






> On Nov 5, 2014, at 04:45, Bhavesh Mistry  wrote:
> 
> Hi Eduardo,
> 
> Can you please take thread dump and see if there are blocking issues on
> producer side ?  Do you have single instance of Producers and Multiple
> treads ?
> 
> Are you using Scala Producer or New Java Producer ?  Also, what is your
> producer property ?
> 
> 
> Thanks,
> 
> Bhavesh
> 
> On Tue, Nov 4, 2014 at 12:40 AM, Eduardo Alfaia 
> wrote:
> 
>> Hi Gwen,
>> I have changed the java code kafkawordcount to use reducebykeyandwindow in
>> spark.
>> 
>> - Messaggio originale -
>> Da: "Gwen Shapira" 
>> Inviato: ‎03/‎11/‎2014 21:08
>> A: "users@kafka.apache.org" 
>> Cc: "u...@spark.incubator.apache.org" 
>> Oggetto: Re: Spark Kafka Performance
>> 
>> Not sure about the throughput, but:
>> 
>> "I mean that the words counted in spark should grow up" - The spark
>> word-count example doesn't accumulate.
>> It gets an RDD every n seconds and counts the words in that RDD. So we
>> don't expect the count to go up.
>> 
>> 
>> 
>> On Mon, Nov 3, 2014 at 6:57 AM, Eduardo Costa Alfaia <
>> e.costaalf...@unibs.it
>>> wrote:
>> 
>>> Hi Guys,
>>> Anyone could explain me how to work Kafka with Spark, I am using the
>>> JavaKafkaWordCount.java like a test and the line command is:
>>> 
>>> ./run-example org.apache.spark.streaming.examples.JavaKafkaWordCount
>>> spark://192.168.0.13:7077 computer49:2181 test-consumer-group unibs.it 3
>>> 
>>> and like a producer I am using this command:
>>> 
>>> rdkafka_cachesender -t unibs.nec -p 1 -b 192.168.0.46:9092 -f output.txt
>>> -l 100 -n 10
>>> 
>>> 
>>> rdkafka_cachesender is a program that was developed by me which send to
>>> kafka the output.txt’s content where -l is the length of each send(upper
>>> bound) and -n is the lines to send in a row. Bellow is the throughput
>>> calculated by the program:
>>> 
>>> File is 2235755 bytes
>>> throughput (b/s) = 699751388
>>> throughput (b/s) = 723542382
>>> throughput (b/s) = 662989745
>>> throughput (b/s) = 505028200
>>> throughput (b/s) = 471263416
>>> throughput (b/s) = 446837266
>>> throughput (b/s) = 409856716
>>> throughput (b/s) = 373994467
>>> throughput (b/s) = 366343097
>>> throughput (b/s) = 373240017
>>> throughput (b/s) = 386139016
>>> throughput (b/s) = 373802209
>>> throughput (b/s) = 369308515
>>> throughput (b/s) = 366935820
>>> throughput (b/s) = 365175388
>>> throughput (b/s) = 362175419
>>> throughput (b/s) = 358356633
>>> throughput (b/s) = 357219124
>>> throughput (b/s) = 352174125
>>> throughput (b/s) = 348313093
>>> throughput (b/s) = 355099099
>>> throughput (b/s) = 348069777
>>> throughput (b/s) = 348478302
>>> throughput (b/s) = 340404276
>>> throughput (b/s) = 339876031
>>> throughput (b/s) = 339175102
>>> throughput (b/s) = 327555252
>>> throughput (b/s) = 324272374
>>> throughput (b/s) = 322479222
>>> throughput (b/s) = 319544906
>>> throughput (b/s) = 317201853
>>> throughput (b/s) = 317351399
>>> throughput (b/s) = 315027978
>>> throughput (b/s) = 313831014
>>> throughput (b/s) = 310050384
>>> throughput (b/s) = 307654601
>>> throughput (b/s) = 305707061
>>> throughput (b/s) = 307961102
>>> throughput (b/s) = 296898200
>>> throughput (b/s) = 296409904
>>> throughput (b/s) = 294609332
>>> throughput (b/s) = 293397843
>>> throughput (b/s) = 293194876
>>> throughput (b/s) = 291724886
>>> throughput (b/s) = 290031314
>>> throughput (b/s) = 289747022
>>> throughput (b/s) = 289299632
>>> 
>>> The throughput goes down after some seconds and it does not maintain the
>>> performance like the initial values:
>>> 
>>> throughput (b/s) = 699751388
>>> throughput (b/s) = 723542382
>>> throughput (b/s) = 662989745
>>> 
>>> Another question is about spark, after I have started the spark line
>>> command after 15 sec spark continue to repeat the words counted, but my
>>> program continue to send words to kafka, so I mean that the words counted
>>> in spark should grow up. I have attached the log from spark.
>>> 
>>> My Case is:
>>> 
>>> ComputerA(Kafka_cachsesender) -> ComputerB(Kakfa-Brokers-Zookeeper) ->
>>> ComputerC (Spark)
>>> 
>>> If I don’t explain very well send a reply to me.
>>> 
>>> Thanks Guys
>>> --
>>> Informativa sulla Privacy: http://www.unibs.it/node/8155
>>> 
>> 
>> --
>> Informativa sulla Privacy: http://www.unibs.it/node/8155
>> 


-- 
Informativa sulla Privacy: http://www.unibs.it/node/8155


Producer and Consumer properties

2014-11-05 Thread Eduardo Costa Alfaia
Hi Dudes,

I would like to know if the producer and consumer’s properties files into the 
config folder should be configured. I have configured only the 
server.properties, is it enough? I am doing some tests about the performance, 
for example network throughput my scenario is:

Like producer I am using this program in c:



Like consumer this:




1 Server (zookeeper + 3 Brokers (8 partitions and Replication factor 3))
24GB RAM
5.0TB Hard Disc
eth0: Broadcom NetXtreme II BCM5709 1000Base-T 


Exist a great difference of throughput between the producer and consumer, does 
someone have any ideia?

Results:

ProducerConsumer
throughput (b/s) = 301393419received = 4083875, throughput (b/s) = 5571423
throughput (b/s) = 424807283received = 7146741, throughput (b/s) = 8061556
throughput (b/s) = 445245606received = 13270522, throughput (b/s) = 12925199
throughput (b/s) = 466454739received = 16333527, throughput (b/s) = 13890292
throughput (b/s) = 442368081received = 18375214, throughput (b/s) = 13967440
throughput (b/s) = 436540119received = 20416859, throughput (b/s) = 14127520
throughput (b/s) = 427105440received = 24500066, throughput (b/s) = 15594622
throughput (b/s) = 426395933received = 27563023, throughput (b/s) = 16177493
throughput (b/s) = 409344029received = 34708625, throughput (b/s) = 18740726
throughput (b/s) = 403371185received = 37771189, throughput (b/s) = 17961816
throughput (b/s) = 403325568received = 39813038, throughput (b/s) = 17654058
throughput (b/s) = 397938415received = 47979107, throughput (b/s) = 19686322
throughput (b/s) = 393364006received = 53083307, throughput (b/s) = 20623441
throughput (b/s) = 387393832received = 57166558, throughput (b/s) = 21050531
throughput (b/s) = 380266372received = 59207558, throughput (b/s) = 20654404
throughput (b/s) = 376436729received = 62269998, throughput (b/s) = 20740363
throughput (b/s) = 377043675received = 65332901, throughput (b/s) = 20888135
throughput (b/s) = 368613683received = 67374558, throughput (b/s) = 20467503
throughput (b/s) = 370020865received = 71457763, throughput (b/s) = 20727773
throughput (b/s) = 373827848received = 73499480, throughput (b/s) = 20171583
throughput (b/s) = 369647040received = 75541289, throughput (b/s) = 19599155
throughput (b/s) = 363395680received = 80645776, throughput (b/s) = 20033582


Thanks Guys



-- 
Informativa sulla Privacy: http://www.unibs.it/node/8155


Re: consumer ack for high-level consumer?

2014-11-05 Thread Guozhang Wang
Hello,

You can turn of auto.commit.offset and manually call
connector.commitOffset() manually after you have processed the data. One
thing to remember is that the commit frequency is related to ZK (in the
future, Kafka) writes and hence you may not want to commit after processed
every single message but only a batch of messages.

Guozhang

On Tue, Nov 4, 2014 at 10:42 PM, Chia-Chun Shih 
wrote:

> Hi,
>
> I am a new to Kafka. In my understanding, high-level consumer (
> ZookeeperConsumerConnector) changes offset when message is drawn
> by ConsumerIterator. But I would like to change offset when message is
> processed, not when message is drawn from broker. So if a consumer dies
> before a message is completely processed, the message will be processed
> again. Is it possible?
>
> Thanks.
>



-- 
-- Guozhang


Re: How costly is Re balancing of partitions for a topic

2014-11-05 Thread Guozhang Wang
Hello Dinesh,

1. A rebalance is triggered when the consumers is notified or the group
member change / topic-partition change through ZK.

2. The cost of a rebalance is positively related to the #. consumers in the
group and the #. of topics this group is consuming. The latency of the
rebalance can be as high as tens of seconds when you have large number of
consumers fetching from a large number of topics.

3. Rebalance algorithm is deterministic (range-based), and before it kicks
in consumers will first commit their current offset and stop fetchers,
hence when M1 is already fetched by some consumer C1 before rebalance it
will not be re-send to another C2 after the rebalance.

You can also read some faqs here:

https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-CanIpredicttheresultsoftheconsumerrebalance
?

And in 0.9, we will release our new consumer client, which will reduce
rebalance latency compared to the current consumer.

https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design


Guozhang






On Wed, Nov 5, 2014 at 4:50 AM, dinesh kumar  wrote:

> Hello,
>
> I am trying to come up with a design for consuming from Kafka.  *I am using
> 0.8.1.1 version of Kafka. *I am thinking of designing a system where the
> consumer will be created every few seconds, consume the data from Kafka,
> process it and then quits after committing the offsets to Kafka. At any
> point of time expect 250 - 300 consumers to be active (running as
> ThreadPools in different machines).
>
> 1. How and When a rebalance of partition happens?
>
> 2. How costly is the rebalancing of partitions among the consumers. I am
> expecting a new consumer finishing up or joining every few seconds to the
> same consumer group. So I just want to know the overhead and latency of a
> rebalancing operation.
>
> 3. Say Consumer C1 has Partitions P1, P2, P3 assigned to it and it is
> processing a message M1 from Partition P1. Now Consumer C2 joins the
> group.  How is the partitions divided between C1 and C2. Is there a
> possibility where C1's (which might take some time to commit its message to
> Kafka) commit for M1 will be rejected and M1 will be treated as a fresh
> message and will be delivered to someone else (I know Kafka is at least
> once delivery model but wanted to confirm if the re partition by any chance
> cause a re delivery of the same message)?
>
>
> Thanks,
> Dinesh
>



-- 
-- Guozhang


Re: How costly is Re balancing of partitions for a topic

2014-11-05 Thread dinesh kumar
Thanks for the answers. Have some follow up questions.

Let me get a bit more specific.

In a scenario of 1 topic with 400 - 500 partitions

1. Is it ok to have short lived consumer? Or it is recommended to have only
long running consumers?

2. You mentioned that rebalance latency depends on # of consumers and #
number of topics. In the case of 1 topic and hundred of consumers can say
the latency is in the tens of seconds as you mentioned before?

3. You mentioned

On Wed, Nov 5, 2014 at 10:03 PM, Guozhang Wang  wrote:

> Hello Dinesh,
>
> 1. A rebalance is triggered when the consumers is notified or the group
> member change / topic-partition change through ZK.
>
> 2. The cost of a rebalance is positively related to the #. consumers in the
> group and the #. of topics this group is consuming. The latency of the
> rebalance can be as high as tens of seconds when you have large number of
> consumers fetching from a large number of topics.
>
> 3. Rebalance algorithm is deterministic (range-based), and before it kicks
> in consumers will first commit their current offset and stop fetchers,
> hence when M1 is already fetched by some consumer C1 before rebalance it
> will not be re-send to another C2 after the rebalance.
>
> You can also read some faqs here:
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-CanIpredicttheresultsoftheconsumerrebalance
> ?
>
> And in 0.9, we will release our new consumer client, which will reduce
> rebalance latency compared to the current consumer.
>
> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design
>
>
> Guozhang
>
>
>
>
>
>
> On Wed, Nov 5, 2014 at 4:50 AM, dinesh kumar  wrote:
>
> > Hello,
> >
> > I am trying to come up with a design for consuming from Kafka.  *I am
> using
> > 0.8.1.1 version of Kafka. *I am thinking of designing a system where the
> > consumer will be created every few seconds, consume the data from Kafka,
> > process it and then quits after committing the offsets to Kafka. At any
> > point of time expect 250 - 300 consumers to be active (running as
> > ThreadPools in different machines).
> >
> > 1. How and When a rebalance of partition happens?
> >
> > 2. How costly is the rebalancing of partitions among the consumers. I am
> > expecting a new consumer finishing up or joining every few seconds to the
> > same consumer group. So I just want to know the overhead and latency of a
> > rebalancing operation.
> >
> > 3. Say Consumer C1 has Partitions P1, P2, P3 assigned to it and it is
> > processing a message M1 from Partition P1. Now Consumer C2 joins the
> > group.  How is the partitions divided between C1 and C2. Is there a
> > possibility where C1's (which might take some time to commit its message
> to
> > Kafka) commit for M1 will be rejected and M1 will be treated as a fresh
> > message and will be delivered to someone else (I know Kafka is at least
> > once delivery model but wanted to confirm if the re partition by any
> chance
> > cause a re delivery of the same message)?
> >
> >
> > Thanks,
> > Dinesh
> >
>
>
>
> --
> -- Guozhang
>


Re: How costly is Re balancing of partitions for a topic

2014-11-05 Thread dinesh kumar
Thanks for the answers. Have some follow up questions.

Let me get a bit more specific.

In a scenario of 1 topic with 400 - 500 partitions

1. Is it ok to have short lived consumer? Or it is recommended to have only
long running consumers?

2. You mentioned that rebalance latency depends on # of consumers and #
number of topics. In the case of 1 topic and hundred of consumers can say
the latency is in the tens of seconds as you mentioned before?

3. You mentioned


"Rebalance algorithm is deterministic (range-based), and before it kicks
in consumers will first commit their current offset and stop fetchers,
hence when M1 is already fetched by some consumer C1 before rebalance it
will not be re-send to another C2 after the rebalance."


Say a consumer fetches a message and does some processing with it for 5
minutes and then commits the offset, if the rebalancing waits for all the
consumers to commit offsets will it wait for 5 minutes? Or is there a
timeout here?

If the consumer does not commit after 5 minutes due to some exception what
will happen?


Thanks,
Dinesh


On Wed, Nov 5, 2014 at 10:22 PM, dinesh kumar  wrote:

> Thanks for the answers. Have some follow up questions.
>
> Let me get a bit more specific.
>
> In a scenario of 1 topic with 400 - 500 partitions
>
> 1. Is it ok to have short lived consumer? Or it is recommended to have
> only long running consumers?
>
> 2. You mentioned that rebalance latency depends on # of consumers and #
> number of topics. In the case of 1 topic and hundred of consumers can say
> the latency is in the tens of seconds as you mentioned before?
>
> 3. You mentioned
>
> On Wed, Nov 5, 2014 at 10:03 PM, Guozhang Wang  wrote:
>
>> Hello Dinesh,
>>
>> 1. A rebalance is triggered when the consumers is notified or the group
>> member change / topic-partition change through ZK.
>>
>> 2. The cost of a rebalance is positively related to the #. consumers in
>> the
>> group and the #. of topics this group is consuming. The latency of the
>> rebalance can be as high as tens of seconds when you have large number of
>> consumers fetching from a large number of topics.
>>
>> 3. Rebalance algorithm is deterministic (range-based), and before it kicks
>> in consumers will first commit their current offset and stop fetchers,
>> hence when M1 is already fetched by some consumer C1 before rebalance it
>> will not be re-send to another C2 after the rebalance.
>>
>> You can also read some faqs here:
>>
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-CanIpredicttheresultsoftheconsumerrebalance
>> ?
>>
>> And in 0.9, we will release our new consumer client, which will reduce
>> rebalance latency compared to the current consumer.
>>
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design
>>
>>
>> Guozhang
>>
>>
>>
>>
>>
>>
>> On Wed, Nov 5, 2014 at 4:50 AM, dinesh kumar  wrote:
>>
>> > Hello,
>> >
>> > I am trying to come up with a design for consuming from Kafka.  *I am
>> using
>> > 0.8.1.1 version of Kafka. *I am thinking of designing a system where the
>> > consumer will be created every few seconds, consume the data from Kafka,
>> > process it and then quits after committing the offsets to Kafka. At any
>> > point of time expect 250 - 300 consumers to be active (running as
>> > ThreadPools in different machines).
>> >
>> > 1. How and When a rebalance of partition happens?
>> >
>> > 2. How costly is the rebalancing of partitions among the consumers. I am
>> > expecting a new consumer finishing up or joining every few seconds to
>> the
>> > same consumer group. So I just want to know the overhead and latency of
>> a
>> > rebalancing operation.
>> >
>> > 3. Say Consumer C1 has Partitions P1, P2, P3 assigned to it and it is
>> > processing a message M1 from Partition P1. Now Consumer C2 joins the
>> > group.  How is the partitions divided between C1 and C2. Is there a
>> > possibility where C1's (which might take some time to commit its
>> message to
>> > Kafka) commit for M1 will be rejected and M1 will be treated as a fresh
>> > message and will be delivered to someone else (I know Kafka is at least
>> > once delivery model but wanted to confirm if the re partition by any
>> chance
>> > cause a re delivery of the same message)?
>> >
>> >
>> > Thanks,
>> > Dinesh
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>


Storing data in kafka keys

2014-11-05 Thread Ivan Balashov
Hi,

It looks like it is a general practice to avoid storing data in kafka
keys. Some examples of this: Camus, Secor both not using keys. Even
such a swiss-army tool as kafkacat doesn't seem to have the ability to
display key (although I might be wrong). Also, console producer does
not display keys by default, which makes it confusing in initial quick
checks for data.

What's the story behind this? Should one think twice before tying
business data to kafka keys?

Thanks,


Consumer lag keep increasing

2014-11-05 Thread Chen Wang
Hey Guys,
I have a really simply storm topology with a kafka spout, reading from
kafka through high level consumer. Since the topic has 30 partitions, we
have 30 threads in the spout reading from it. However, it seems that the
lag keeps increasing even the thread only read the message and do nothing.
The largest message size  are around 30KB, and the incoming rate can be as
hight as 14k/seconds. There are 3 brokers on some high config bare metal
machines. The client side config is like this:

kafka.config.fetch.message.max.bytes3145728
kafka.config.group.id   spout_readonly
kafka.config.rebalance.backoff.ms   6000
kafka.config.rebalance.max.retries  6
kafka.config.zookeeper.connect  dare-broker00.sv.walmartlabs.com:2181,
dare-broker01.sv.walmartlabs.com:2181,dare-broker02.sv.walmartlabs.com:2181
kafka.config.zookeeper.session.timeout.ms   6

what could possibly cause this huge lag? Will broker be a bottle neck, or
some config need to be adjusted? The server side config is like this:

replica.fetch.max.bytes=2097152
message.max.bytes=2097152
num.network.threads=4
num.io.threads=4

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=4194304

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=2097152

# The maximum size of a request that the socket server will accept
(protection against OOM)
socket.request.max.bytes=104857600

Any help appreciated!
Chen


Re: How costly is Re balancing of partitions for a topic

2014-11-05 Thread Guozhang Wang
1. Since each time a consumer group changes a rebalance among all the
consumer members is triggered, it is usually recommend to have long lived
consumers rather than short ones. However, in the new consumer we are
working on optimizing the rebalance logic and remove its ZK dependency, so
in the new consumer (coming next spring) short lived consumers coming and
going should also be OK.

2. I was not correct before, it should be #. total partitions rather than
#. topics, since your scenario has 500 partitions it may still result in
high latency in the current consumer.

3. Once the message is returned from the iterator it is considered
"consumed", i.e. the offset increment itself. If auto offset commit is
turned on (by default), then before rebalance happens it will force a
commit and hence that offset will be written to ZK and this message will
not be exposed to others again.

4. If auto commit is turned off, and manual commit gets delayed somehow,
rebalance will cause some duplicates.

Guozhang


On Wed, Nov 5, 2014 at 8:57 AM, dinesh kumar  wrote:

> Thanks for the answers. Have some follow up questions.
>
> Let me get a bit more specific.
>
> In a scenario of 1 topic with 400 - 500 partitions
>
> 1. Is it ok to have short lived consumer? Or it is recommended to have only
> long running consumers?
>
> 2. You mentioned that rebalance latency depends on # of consumers and #
> number of topics. In the case of 1 topic and hundred of consumers can say
> the latency is in the tens of seconds as you mentioned before?
>
> 3. You mentioned
>
>
> "Rebalance algorithm is deterministic (range-based), and before it kicks
> in consumers will first commit their current offset and stop fetchers,
> hence when M1 is already fetched by some consumer C1 before rebalance it
> will not be re-send to another C2 after the rebalance."
>
>
> Say a consumer fetches a message and does some processing with it for 5
> minutes and then commits the offset, if the rebalancing waits for all the
> consumers to commit offsets will it wait for 5 minutes? Or is there a
> timeout here?
>
> If the consumer does not commit after 5 minutes due to some exception what
> will happen?
>
>
> Thanks,
> Dinesh
>
>
> On Wed, Nov 5, 2014 at 10:22 PM, dinesh kumar  wrote:
>
> > Thanks for the answers. Have some follow up questions.
> >
> > Let me get a bit more specific.
> >
> > In a scenario of 1 topic with 400 - 500 partitions
> >
> > 1. Is it ok to have short lived consumer? Or it is recommended to have
> > only long running consumers?
> >
> > 2. You mentioned that rebalance latency depends on # of consumers and #
> > number of topics. In the case of 1 topic and hundred of consumers can say
> > the latency is in the tens of seconds as you mentioned before?
> >
> > 3. You mentioned
> >
> > On Wed, Nov 5, 2014 at 10:03 PM, Guozhang Wang 
> wrote:
> >
> >> Hello Dinesh,
> >>
> >> 1. A rebalance is triggered when the consumers is notified or the group
> >> member change / topic-partition change through ZK.
> >>
> >> 2. The cost of a rebalance is positively related to the #. consumers in
> >> the
> >> group and the #. of topics this group is consuming. The latency of the
> >> rebalance can be as high as tens of seconds when you have large number
> of
> >> consumers fetching from a large number of topics.
> >>
> >> 3. Rebalance algorithm is deterministic (range-based), and before it
> kicks
> >> in consumers will first commit their current offset and stop fetchers,
> >> hence when M1 is already fetched by some consumer C1 before rebalance it
> >> will not be re-send to another C2 after the rebalance.
> >>
> >> You can also read some faqs here:
> >>
> >>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-CanIpredicttheresultsoftheconsumerrebalance
> >> ?
> >>
> >> And in 0.9, we will release our new consumer client, which will reduce
> >> rebalance latency compared to the current consumer.
> >>
> >>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design
> >>
> >>
> >> Guozhang
> >>
> >>
> >>
> >>
> >>
> >>
> >> On Wed, Nov 5, 2014 at 4:50 AM, dinesh kumar 
> wrote:
> >>
> >> > Hello,
> >> >
> >> > I am trying to come up with a design for consuming from Kafka.  *I am
> >> using
> >> > 0.8.1.1 version of Kafka. *I am thinking of designing a system where
> the
> >> > consumer will be created every few seconds, consume the data from
> Kafka,
> >> > process it and then quits after committing the offsets to Kafka. At
> any
> >> > point of time expect 250 - 300 consumers to be active (running as
> >> > ThreadPools in different machines).
> >> >
> >> > 1. How and When a rebalance of partition happens?
> >> >
> >> > 2. How costly is the rebalancing of partitions among the consumers. I
> am
> >> > expecting a new consumer finishing up or joining every few seconds to
> >> the
> >> > same consumer group. So I just want to know the overhead and latency
> of
> >> a
> >> > rebalancing operation.
> >> >
> >> > 3. Say Consumer C1 has P

Re: Cannot connect to Kafka from outside of EC2

2014-11-05 Thread Guozhang Wang
Sameer,

Yes, this is the server log. But there seems no abnormal entries in it, and
it does not cover the same time range as the producer client throwing
LeaderNotAvailableException
(it was 10/24, 14:30).

The reason that I want to check the server log at that same reason is
that LeaderNotAvailableException
is set by the brokers while handling requests, and its server log will show
what is the cause it set this error code.

Guozhang

On Tue, Nov 4, 2014 at 12:47 PM, Sameer Yami  wrote:

> Hi Guozhang,
>
> This is the server.log -
>
> [2014-11-04 20:21:57,510] INFO Verifying properties
> (kafka.utils.VerifiableProperties)
> [2014-11-04 20:21:57,545] INFO Property advertised.host.name is overridden
> to x.x.x.x (kafka.utils.VerifiableProperties)
> [2014-11-04 20:21:57,545] INFO Property broker.id is overridden to 0
> (kafka.utils.VerifiableProperties)
> [2014-11-04 20:21:57,545] INFO Property controlled.shutdown.enable is
> overridden to true (kafka.utils.VerifiableProperties)
> [2014-11-04 20:21:57,545] INFO Property host.name is overridden to
> 172.31.25.198 (kafka.utils.VerifiableProperties)
> [2014-11-04 20:21:57,545] INFO Property log.cleaner.enable is overridden to
> false (kafka.utils.VerifiableProperties)
> [2014-11-04 20:21:57,546] INFO Property log.dirs is overridden to
> /tmp/kafka-logs (kafka.utils.VerifiableProperties)
> [2014-11-04 20:21:57,546] INFO Property log.retention.check.interval.ms is
> overridden to 6 (kafka.utils.VerifiableProperties)
> [2014-11-04 20:21:57,546] INFO Property log.retention.hours is overridden
> to 168 (kafka.utils.VerifiableProperties)
> [2014-11-04 20:21:57,546] INFO Property log.segment.bytes is overridden to
> 536870912 (kafka.utils.VerifiableProperties)
> [2014-11-04 20:21:57,546] INFO Property num.io.threads is overridden to 8
> (kafka.utils.VerifiableProperties)
> [2014-11-04 20:21:57,547] INFO Property num.network.threads is overridden
> to 2 (kafka.utils.VerifiableProperties)
> [2014-11-04 20:21:57,547] INFO Property num.partitions is overridden to 2
> (kafka.utils.VerifiableProperties)
> [2014-11-04 20:21:57,547] INFO Property port is overridden to 9092
> (kafka.utils.VerifiableProperties)
> [2014-11-04 20:21:57,547] INFO Property socket.receive.buffer.bytes is
> overridden to 1048576 (kafka.utils.VerifiableProperties)
> [2014-11-04 20:21:57,547] INFO Property socket.request.max.bytes is
> overridden to 104857600 (kafka.utils.VerifiableProperties)
> [2014-11-04 20:21:57,547] INFO Property socket.send.buffer.bytes is
> overridden to 1048576 (kafka.utils.VerifiableProperties)
> [2014-11-04 20:21:57,548] INFO Property zookeeper.connect is overridden to
> localhost:2181 (kafka.utils.VerifiableProperties)
> [2014-11-04 20:21:57,548] INFO Property zookeeper.connection.timeout.ms is
> overridden to 100 (kafka.utils.VerifiableProperties)
> [2014-11-04 20:21:57,563] INFO [Kafka Server 0], starting
> (kafka.server.KafkaServer)
> [2014-11-04 20:21:57,565] INFO [Kafka Server 0], Connecting to zookeeper on
> localhost:2181 (kafka.server.KafkaServer)
> [2014-11-04 20:21:57,738] INFO Log directory '/tmp/kafka-logs' not found,
> creating it. (kafka.log.LogManager)
> [2014-11-04 20:21:57,748] INFO Starting log cleanup with a period of 6
> ms. (kafka.log.LogManager)
> [2014-11-04 20:21:57,752] INFO Starting log flusher with a default period
> of 9223372036854775807 ms. (kafka.log.LogManager)
> [2014-11-04 20:21:57,783] INFO Awaiting socket connections on
> ip-172-31-25-198.us-west-1.compute.internal:9092. (kafka.network.Acceptor)
> [2014-11-04 20:21:57,784] INFO [Socket Server on Broker 0], Started
> (kafka.network.SocketServer)
> [2014-11-04 20:21:57,852] INFO Will not load MX4J, mx4j-tools.jar is not in
> the classpath (kafka.utils.Mx4jLoader$)
> [2014-11-04 20:21:57,884] INFO 0 successfully elected as leader
> (kafka.server.ZookeeperLeaderElector)
> [2014-11-04 20:21:57,967] INFO Registered broker 0 at path /brokers/ids/0
> with address x.x.x.x:9092. (kafka.utils.ZkUtils$)
> [2014-11-04 20:21:57,981] INFO [Kafka Server 0], started
> (kafka.server.KafkaServer)
> [2014-11-04 20:21:58,050] INFO New leader is 0
> (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
> [2014-11-04 20:22:01,809] INFO Closing socket connection to /172.31.25.198
> .
> (kafka.network.Processor)
> [2014-11-04 20:22:11,812] INFO Closing socket connection to /172.31.25.198
> .
> (kafka.network.Processor)
> [2014-11-04 20:30:29,084] INFO Topic creation
> {"version":1,"partitions":{"1":[0],"0":[0]}} (kafka.admin.AdminUtils$)
> [2014-11-04 20:30:29,088] INFO [KafkaApi-0] Auto creation of topic Test
> with 2 partitions and replication factor 1 is successful!
> (kafka.server.KafkaApis)
> [2014-11-04 20:30:29,280] INFO Closing socket connection to /172.31.25.198
> .
> (kafka.network.Processor)
> [2014-11-04 20:30:29,301] INFO Closing socket connection to /172.31.25.198
> .
> (kafka.network.Processor)
> [2014-11-04 20:30:30,310] INFO Closing socket connection to /172.31.25.198
> .
> (kafka.

Re: Consumer lag keep increasing

2014-11-05 Thread Guozhang Wang
Chen,

Your configs seems fine.

Could you use ConsumerOffsetChecker tool to see if the offset is advancing
at all (i.e. messages are comsumed), and if yes get some thread dumps and
check if your consumer is blocked on some locks?

Guozhang

On Wed, Nov 5, 2014 at 2:01 PM, Chen Wang 
wrote:

> Hey Guys,
> I have a really simply storm topology with a kafka spout, reading from
> kafka through high level consumer. Since the topic has 30 partitions, we
> have 30 threads in the spout reading from it. However, it seems that the
> lag keeps increasing even the thread only read the message and do nothing.
> The largest message size  are around 30KB, and the incoming rate can be as
> hight as 14k/seconds. There are 3 brokers on some high config bare metal
> machines. The client side config is like this:
>
> kafka.config.fetch.message.max.bytes3145728
> kafka.config.group.id   spout_readonly
> kafka.config.rebalance.backoff.ms   6000
> kafka.config.rebalance.max.retries  6
> kafka.config.zookeeper.connect  dare-broker00.sv.walmartlabs.com:2181,
> dare-broker01.sv.walmartlabs.com:2181,
> dare-broker02.sv.walmartlabs.com:2181
> kafka.config.zookeeper.session.timeout.ms   6
>
> what could possibly cause this huge lag? Will broker be a bottle neck, or
> some config need to be adjusted? The server side config is like this:
>
> replica.fetch.max.bytes=2097152
> message.max.bytes=2097152
> num.network.threads=4
> num.io.threads=4
>
> # The send buffer (SO_SNDBUF) used by the socket server
> socket.send.buffer.bytes=4194304
>
> # The receive buffer (SO_RCVBUF) used by the socket server
> socket.receive.buffer.bytes=2097152
>
> # The maximum size of a request that the socket server will accept
> (protection against OOM)
> socket.request.max.bytes=104857600
>
> Any help appreciated!
> Chen
>



-- 
-- Guozhang


Re: Consumer lag keep increasing

2014-11-05 Thread Chen Wang
Guozhang,
I can see message keep coming, meaning messages are being consumed, right?
But the lag is pretty huge (average 30m messages behind) as you can see
from the graph:
https://www.dropbox.com/s/xli25zicxv5f2qa/Screenshot%202014-11-05%2015.23.05.png?dl=0

My understanding is that for such light weight thread, the consumer should
almost be at the same pace with the producer. I also checked the machine
metrics, and nothing pegged there.

I am also moving the testing application to a separate dev cluster. In your
experience, what things might cause the slow reading? Is this more like a
server side thing, or consumer side?

Chen

On Wed, Nov 5, 2014 at 3:10 PM, Guozhang Wang  wrote:

> Chen,
>
> Your configs seems fine.
>
> Could you use ConsumerOffsetChecker tool to see if the offset is advancing
> at all (i.e. messages are comsumed), and if yes get some thread dumps and
> check if your consumer is blocked on some locks?
>
> Guozhang
>
> On Wed, Nov 5, 2014 at 2:01 PM, Chen Wang 
> wrote:
>
> > Hey Guys,
> > I have a really simply storm topology with a kafka spout, reading from
> > kafka through high level consumer. Since the topic has 30 partitions, we
> > have 30 threads in the spout reading from it. However, it seems that the
> > lag keeps increasing even the thread only read the message and do
> nothing.
> > The largest message size  are around 30KB, and the incoming rate can be
> as
> > hight as 14k/seconds. There are 3 brokers on some high config bare metal
> > machines. The client side config is like this:
> >
> > kafka.config.fetch.message.max.bytes3145728
> > kafka.config.group.id   spout_readonly
> > kafka.config.rebalance.backoff.ms   6000
> > kafka.config.rebalance.max.retries  6
> > kafka.config.zookeeper.connect  dare-broker00.sv.walmartlabs.com:2181,
> > dare-broker01.sv.walmartlabs.com:2181,
> > dare-broker02.sv.walmartlabs.com:2181
> > kafka.config.zookeeper.session.timeout.ms   6
> >
> > what could possibly cause this huge lag? Will broker be a bottle neck, or
> > some config need to be adjusted? The server side config is like this:
> >
> > replica.fetch.max.bytes=2097152
> > message.max.bytes=2097152
> > num.network.threads=4
> > num.io.threads=4
> >
> > # The send buffer (SO_SNDBUF) used by the socket server
> > socket.send.buffer.bytes=4194304
> >
> > # The receive buffer (SO_RCVBUF) used by the socket server
> > socket.receive.buffer.bytes=2097152
> >
> > # The maximum size of a request that the socket server will accept
> > (protection against OOM)
> > socket.request.max.bytes=104857600
> >
> > Any help appreciated!
> > Chen
> >
>
>
>
> --
> -- Guozhang
>


Re: Cannot connect to Kafka from outside of EC2

2014-11-05 Thread Sameer Yami
The server.log was taken separately.
We ran the test again and the server and producer logs are below (to get
same timings).


Thanks!





Producer Logs -


2014-11-05 23:38:58,693
Thread-3-SendThread(ip-172-31-25-198.us-west-1.compute.internal:2181)
DEBUG org.apache.zookeeper.ClientCnxn-759: Got ping response for sessionid:
0x1498251e8680002 after 0ms
2014-11-05 23:39:00,695
Thread-3-SendThread(ip-172-31-25-198.us-west-1.compute.internal:2181)
DEBUG org.apache.zookeeper.ClientCnxn-759: Got ping response for sessionid:
0x1498251e8680002 after 0ms
2014-11-05 23:39:02,696
Thread-3-SendThread(ip-172-31-25-198.us-west-1.compute.internal:2181)
DEBUG org.apache.zookeeper.ClientCnxn-759: Got ping response for sessionid:
0x1498251e8680002 after 0ms
2014-11-05 23:39:02,828 pool-13-thread-2   INFO
kafka.utils.VerifiableProperties-68: Verifying properties
2014-11-05 23:39:02,829 pool-13-thread-2   INFO
kafka.utils.VerifiableProperties-68: Property auto.commit.interval.ms is
overridden to 1000
2014-11-05 23:39:02,829 pool-13-thread-2   INFO
kafka.utils.VerifiableProperties-68: Property auto.offset.reset is
overridden to smallest
2014-11-05 23:39:02,829 pool-13-thread-2   INFO
kafka.utils.VerifiableProperties-68: Property consumer.timeout.ms is
overridden to 10
2014-11-05 23:39:02,829 pool-13-thread-2   INFO
kafka.utils.VerifiableProperties-68: Property group.id is overridden to
TestCheck
2014-11-05 23:39:02,830 pool-13-thread-2   WARN
kafka.utils.VerifiableProperties-83: Property serializer.class is not valid
2014-11-05 23:39:02,830 pool-13-thread-2   INFO
kafka.utils.VerifiableProperties-68: Property zookeeper.connect is
overridden to 172.31.25.198:2181
2014-11-05 23:39:02,831 pool-13-thread-2   INFO
kafka.consumer.ZookeeperConsumerConnector-68:
[TestCheck_ip-172-31-25-198-1415230742830-f3dfc362], Connecting to
zookeeper instance at 172.31.25.198:2181
2014-11-05 23:39:02,831 pool-13-thread-2  DEBUG
org.I0Itec.zkclient.ZkConnection-63: Creating new ZookKeeper instance to
connect to 172.31.25.198:2181.
2014-11-05 23:39:02,831 pool-13-thread-2   INFO
org.apache.zookeeper.ZooKeeper-379: Initiating client connection,
connectString=172.31.25.198:2181 sessionTimeout=6000
watcher=org.I0Itec.zkclient.ZkClient@3903b165
2014-11-05 23:39:02,831 ZkClient-EventThread-29-172.31.25.198:2181   INFO
org.I0Itec.zkclient.ZkEventThread-64: Starting ZkClient event thread.
2014-11-05 23:39:02,831 pool-13-thread-1   INFO
kafka.utils.VerifiableProperties-68: Verifying properties
2014-11-05 23:39:02,836 pool-13-thread-2-SendThread()   INFO
org.apache.zookeeper.ClientCnxn-1061: Opening socket connection to server /
172.31.25.198:2181
2014-11-05 23:39:02,836 pool-13-thread-1   WARN
kafka.utils.VerifiableProperties-83: Property batch.size is not valid
2014-11-05 23:39:02,832 pool-13-thread-2  DEBUG
org.I0Itec.zkclient.ZkClient-878: Awaiting connection to Zookeeper server
2014-11-05 23:39:02,836 pool-13-thread-1   INFO
kafka.utils.VerifiableProperties-68: Property message.send.max.retries is
overridden to 10
2014-11-05 23:39:02,836 pool-13-thread-2  DEBUG
org.I0Itec.zkclient.ZkClient-628: Waiting for keeper state SyncConnected
2014-11-05 23:39:02,837 pool-13-thread-1   INFO
kafka.utils.VerifiableProperties-68: Property metadata.broker.list is
overridden to 172.31.25.198:9092
2014-11-05 23:39:02,837 pool-13-thread-1   INFO
kafka.utils.VerifiableProperties-68: Property retry.backoff.ms is
overridden to 1000
2014-11-05 23:39:02,837 pool-13-thread-1   INFO
kafka.utils.VerifiableProperties-68: Property serializer.class is
overridden to kafka.serializer.StringEncoder
2014-11-05 23:39:02,837
pool-13-thread-2-SendThread(ip-172-31-25-198.us-west-1.compute.internal:2181)
INFO org.apache.zookeeper.ClientCnxn-950: Socket connection established to
ip-172-31-25-198.us-west-1.compute.internal/172.31.25.198:2181, initiating
session
2014-11-05 23:39:02,838
pool-13-thread-2-SendThread(ip-172-31-25-198.us-west-1.compute.internal:2181)
DEBUG org.apache.zookeeper.ClientCnxn-999: Session establishment request
sent on ip-172-31-25-198.us-west-1.compute.internal/172.31.25.198:2181
2014-11-05 23:39:02,837 pool-13-thread-1   WARN
kafka.utils.VerifiableProperties-83: Property zk.connectiontimeout.ms is
not valid
2014-11-05 23:39:02,841
pool-13-thread-2-SendThread(ip-172-31-25-198.us-west-1.compute.internal:2181)
INFO org.apache.zookeeper.ClientCnxn-739: Session establishment complete on
server ip-172-31-25-198.us-west-1.compute.internal/172.31.25.198:2181,
sessionid = 0x1498251e8680003, negotiated timeout = 6000
2014-11-05 23:39:02,841 pool-13-thread-2-EventThread  DEBUG
org.I0Itec.zkclient.ZkClient-351: Received event: WatchedEvent
state:SyncConnected type:None path:null
2014-11-05 23:39:02,841 pool-13-thread-2-EventThread   INFO
org.I0Itec.zkclient.ZkClient-449: zookeeper state changed (SyncConnected)
2014-11-05 23:39:02,841 pool-13-thread-2-EventThread  DEBUG
org.I0Itec.zkclient.ZkClien

No longer supporting Java 6, if? when?

2014-11-05 Thread Joe Stein
This has been coming up in a lot of projects and for other reasons too I
wanted to kick off the discussion about if/when we end support for Java 6.
Besides any API we may want to use in >= 7 we also compile our binaries for
6 for release currently.

/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop 
/


Re: Error using migrationtool for upgrading 0.7 to 0.8

2014-11-05 Thread Tomas Nunez
Ok, still fighting with the migrationTool here...

That tuple wasn't in the scala-library.jar. It turns out I was using scala
2.10 for kafka0.8 and scala 2.8 for kafka0.7, and the jar files were not
compatible. So, for the record, it seems that you need both the 0.7 jar
files and your 0.8 kafka compiled with the same java version.

After fixing that (downloading kafka 0.8 compiled with scala 2.8), I'm now
facing a different error, this time more crypic:

Kafka migration tool failed due to:
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at kafka.tools.KafkaMigrationTool.main(KafkaMigrationTool.java:204)
Caused by: java.lang.NullPointerException
at org.apache.zookeeper.ClientCnxn.(ClientCnxn.java:361)
at org.apache.zookeeper.ClientCnxn.(ClientCnxn.java:332)
at org.apache.zookeeper.ZooKeeper.(ZooKeeper.java:383)
at org.I0Itec.zkclient.ZkConnection.connect(ZkConnection.java:64)
at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:872)
at org.I0Itec.zkclient.ZkClient.(ZkClient.java:98)
at org.I0Itec.zkclient.ZkClient.(ZkClient.java:84)
at
kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:152)
at
kafka.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:122)
at
kafka.javaapi.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:65)
at
kafka.javaapi.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:67)
at
kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:88)
at
kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)
... 5 more

[2014-11-06 01:32:44,362] ERROR Kafka migration tool failed:
(kafka.tools.KafkaMigrationTool)
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at kafka.tools.KafkaMigrationTool.main(KafkaMigrationTool.java:204)
Caused by: java.lang.NullPointerException
at org.apache.zookeeper.ClientCnxn.(ClientCnxn.java:361)
at org.apache.zookeeper.ClientCnxn.(ClientCnxn.java:332)
at org.apache.zookeeper.ZooKeeper.(ZooKeeper.java:383)
at org.I0Itec.zkclient.ZkConnection.connect(ZkConnection.java:64)
at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:872)
at org.I0Itec.zkclient.ZkClient.(ZkClient.java:98)
at org.I0Itec.zkclient.ZkClient.(ZkClient.java:84)
at
kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:152)
at
kafka.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:122)
at
kafka.javaapi.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:65)
at
kafka.javaapi.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:67)
at
kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:88)
at
kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)
... 5 more

Using strace with it, I could see these errors again:

13342 stat("/var/migration/kafka/utils/Logging.class", 0x7fc92913c280) = -1
ENOENT (No such file or directory)
13342 stat("/var/migration/kafka/common/KafkaStorageException.class",
0x7fc92913d950) = -1 ENOENT (No such file or directory)
13342 stat("/var/migration/kafka/common/KafkaException.class",
0x7fc92913d950) = -1 ENOENT (No such file or directory)
13342 stat("/var/migration/kafka/utils/Logging$class.class",
0x7fc92913de00) = -1 ENOENT (No such file or directory)
13342 stat("/var/migration/kafka/utils/Log4jController$.class",
0x7fc92913dc60) = -1 ENOENT (No such file or directory)
13342 stat("/var/migration/kafka/utils/Log4jController.class",
0x7fc92913d7c0) = -1 ENOENT (No such file or directory)
13342 stat("/var/migration/kafka/utils/Log4jControllerMBean.class",
0x7fc92913b4e0) = -1 ENOENT (No such file or directory)
13342
stat("/var/migration/org/apache/log4j/spi/ThrowableInformation.class",
0x7fc92913e860) = -1 ENOENT (No such file or directory)

and after I cd to kafka-0.8.1.1-src/core/build/classes/main/, a lot of
those ENOENT disappear, but I still get the same "Null Pointer error".
Grepping "ENOENT" I see:

13257
stat("/var/migration/migration-tool/kafka-0.8.1.1-src/core/build/classes/main/org/apache/zookeeper/Environment.class",
0x7fee656ae750) = -1 ENOENT (No such file or dir

Re: Error using migrationtool for upgrading 0.7 to 0.8

2014-11-05 Thread Gwen Shapira
org.apache.zookeeper.ClientCnxn is throwing the exception, so I'm 100% sure
it eventually found the class.

On Wed, Nov 5, 2014 at 5:59 PM, Tomas Nunez  wrote:

> Ok, still fighting with the migrationTool here...
>
> That tuple wasn't in the scala-library.jar. It turns out I was using scala
> 2.10 for kafka0.8 and scala 2.8 for kafka0.7, and the jar files were not
> compatible. So, for the record, it seems that you need both the 0.7 jar
> files and your 0.8 kafka compiled with the same java version.
>
> After fixing that (downloading kafka 0.8 compiled with scala 2.8), I'm now
> facing a different error, this time more crypic:
>
> Kafka migration tool failed due to:
> java.lang.reflect.InvocationTargetException
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at kafka.tools.KafkaMigrationTool.main(KafkaMigrationTool.java:204)
> Caused by: java.lang.NullPointerException
> at org.apache.zookeeper.ClientCnxn.(ClientCnxn.java:361)
> at org.apache.zookeeper.ClientCnxn.(ClientCnxn.java:332)
> at org.apache.zookeeper.ZooKeeper.(ZooKeeper.java:383)
> at org.I0Itec.zkclient.ZkConnection.connect(ZkConnection.java:64)
> at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:872)
> at org.I0Itec.zkclient.ZkClient.(ZkClient.java:98)
> at org.I0Itec.zkclient.ZkClient.(ZkClient.java:84)
> at
>
> kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:152)
> at
>
> kafka.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:122)
> at
>
> kafka.javaapi.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:65)
> at
>
> kafka.javaapi.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:67)
> at
>
> kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:88)
> at
>
> kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)
> ... 5 more
>
> [2014-11-06 01:32:44,362] ERROR Kafka migration tool failed:
> (kafka.tools.KafkaMigrationTool)
> java.lang.reflect.InvocationTargetException
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at kafka.tools.KafkaMigrationTool.main(KafkaMigrationTool.java:204)
> Caused by: java.lang.NullPointerException
> at org.apache.zookeeper.ClientCnxn.(ClientCnxn.java:361)
> at org.apache.zookeeper.ClientCnxn.(ClientCnxn.java:332)
> at org.apache.zookeeper.ZooKeeper.(ZooKeeper.java:383)
> at org.I0Itec.zkclient.ZkConnection.connect(ZkConnection.java:64)
> at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:872)
> at org.I0Itec.zkclient.ZkClient.(ZkClient.java:98)
> at org.I0Itec.zkclient.ZkClient.(ZkClient.java:84)
> at
>
> kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:152)
> at
>
> kafka.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:122)
> at
>
> kafka.javaapi.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:65)
> at
>
> kafka.javaapi.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:67)
> at
>
> kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:88)
> at
>
> kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)
> ... 5 more
>
> Using strace with it, I could see these errors again:
>
> 13342 stat("/var/migration/kafka/utils/Logging.class", 0x7fc92913c280) = -1
> ENOENT (No such file or directory)
> 13342 stat("/var/migration/kafka/common/KafkaStorageException.class",
> 0x7fc92913d950) = -1 ENOENT (No such file or directory)
> 13342 stat("/var/migration/kafka/common/KafkaException.class",
> 0x7fc92913d950) = -1 ENOENT (No such file or directory)
> 13342 stat("/var/migration/kafka/utils/Logging$class.class",
> 0x7fc92913de00) = -1 ENOENT (No such file or directory)
> 13342 stat("/var/migration/kafka/utils/Log4jController$.class",
> 0x7fc92913dc60) = -1 ENOENT (No such file or directory)
> 13342 stat("/var/migration/kafka/utils/Log4jController.class",
> 0x7fc92913d7c0) = -1 ENOENT (No such file or directory)
> 13342 stat("/var/migration/kafka/utils/Log4jControllerMBean.class",
> 0x7fc92913b4e0) = -1 ENOENT (No such file or directory)
> 13342
> stat("/var/migration/org/apache/log4j/spi/ThrowableInformation.class",
> 0x7fc92913e860) = -1 ENOENT

Re: Error using migrationtool for upgrading 0.7 to 0.8

2014-11-05 Thread Gwen Shapira
Regarding more information:
Maybe ltrace?

If I were you, I'd go to MigrationTool code and start adding LOG lines.
because there aren't enough of those to troubleshoot.

On Wed, Nov 5, 2014 at 6:13 PM, Gwen Shapira  wrote:

> org.apache.zookeeper.ClientCnxn is throwing the exception, so I'm 100%
> sure it eventually found the class.
>
> On Wed, Nov 5, 2014 at 5:59 PM, Tomas Nunez  wrote:
>
>> Ok, still fighting with the migrationTool here...
>>
>> That tuple wasn't in the scala-library.jar. It turns out I was using scala
>> 2.10 for kafka0.8 and scala 2.8 for kafka0.7, and the jar files were not
>> compatible. So, for the record, it seems that you need both the 0.7 jar
>> files and your 0.8 kafka compiled with the same java version.
>>
>> After fixing that (downloading kafka 0.8 compiled with scala 2.8), I'm now
>> facing a different error, this time more crypic:
>>
>> Kafka migration tool failed due to:
>> java.lang.reflect.InvocationTargetException
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at
>> kafka.tools.KafkaMigrationTool.main(KafkaMigrationTool.java:204)
>> Caused by: java.lang.NullPointerException
>> at org.apache.zookeeper.ClientCnxn.(ClientCnxn.java:361)
>> at org.apache.zookeeper.ClientCnxn.(ClientCnxn.java:332)
>> at org.apache.zookeeper.ZooKeeper.(ZooKeeper.java:383)
>> at org.I0Itec.zkclient.ZkConnection.connect(ZkConnection.java:64)
>> at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:872)
>> at org.I0Itec.zkclient.ZkClient.(ZkClient.java:98)
>> at org.I0Itec.zkclient.ZkClient.(ZkClient.java:84)
>> at
>>
>> kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:152)
>> at
>>
>> kafka.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:122)
>> at
>>
>> kafka.javaapi.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:65)
>> at
>>
>> kafka.javaapi.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:67)
>> at
>>
>> kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:88)
>> at
>>
>> kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)
>> ... 5 more
>>
>> [2014-11-06 01:32:44,362] ERROR Kafka migration tool failed:
>> (kafka.tools.KafkaMigrationTool)
>> java.lang.reflect.InvocationTargetException
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at
>> kafka.tools.KafkaMigrationTool.main(KafkaMigrationTool.java:204)
>> Caused by: java.lang.NullPointerException
>> at org.apache.zookeeper.ClientCnxn.(ClientCnxn.java:361)
>> at org.apache.zookeeper.ClientCnxn.(ClientCnxn.java:332)
>> at org.apache.zookeeper.ZooKeeper.(ZooKeeper.java:383)
>> at org.I0Itec.zkclient.ZkConnection.connect(ZkConnection.java:64)
>> at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:872)
>> at org.I0Itec.zkclient.ZkClient.(ZkClient.java:98)
>> at org.I0Itec.zkclient.ZkClient.(ZkClient.java:84)
>> at
>>
>> kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:152)
>> at
>>
>> kafka.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:122)
>> at
>>
>> kafka.javaapi.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:65)
>> at
>>
>> kafka.javaapi.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:67)
>> at
>>
>> kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:88)
>> at
>>
>> kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)
>> ... 5 more
>>
>> Using strace with it, I could see these errors again:
>>
>> 13342 stat("/var/migration/kafka/utils/Logging.class", 0x7fc92913c280) =
>> -1
>> ENOENT (No such file or directory)
>> 13342 stat("/var/migration/kafka/common/KafkaStorageException.class",
>> 0x7fc92913d950) = -1 ENOENT (No such file or directory)
>> 13342 stat("/var/migration/kafka/common/KafkaException.class",
>> 0x7fc92913d950) = -1 ENOENT (No such file or directory)
>> 13342 stat("/var/migration/kafka/utils/Logging$class.class",
>> 0x7fc92913de00) = -1 ENOENT (No such file or directory)
>> 13342 stat("/var/migration/kafka/utils/Log4jController$.class",
>> 0x7fc92913dc60) = -1 ENOENT (No such file or directory)
>> 13342 stat("/var

Re: Error using migrationtool for upgrading 0.7 to 0.8

2014-11-05 Thread Gwen Shapira
Also, can you post your configs? Especially the "zookeeper.connect" one?

On Wed, Nov 5, 2014 at 6:15 PM, Gwen Shapira  wrote:

> Regarding more information:
> Maybe ltrace?
>
> If I were you, I'd go to MigrationTool code and start adding LOG lines.
> because there aren't enough of those to troubleshoot.
>
> On Wed, Nov 5, 2014 at 6:13 PM, Gwen Shapira 
> wrote:
>
>> org.apache.zookeeper.ClientCnxn is throwing the exception, so I'm 100%
>> sure it eventually found the class.
>>
>> On Wed, Nov 5, 2014 at 5:59 PM, Tomas Nunez  wrote:
>>
>>> Ok, still fighting with the migrationTool here...
>>>
>>> That tuple wasn't in the scala-library.jar. It turns out I was using
>>> scala
>>> 2.10 for kafka0.8 and scala 2.8 for kafka0.7, and the jar files were not
>>> compatible. So, for the record, it seems that you need both the 0.7 jar
>>> files and your 0.8 kafka compiled with the same java version.
>>>
>>> After fixing that (downloading kafka 0.8 compiled with scala 2.8), I'm
>>> now
>>> facing a different error, this time more crypic:
>>>
>>> Kafka migration tool failed due to:
>>> java.lang.reflect.InvocationTargetException
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>>
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>> at
>>>
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>> at
>>> kafka.tools.KafkaMigrationTool.main(KafkaMigrationTool.java:204)
>>> Caused by: java.lang.NullPointerException
>>> at org.apache.zookeeper.ClientCnxn.(ClientCnxn.java:361)
>>> at org.apache.zookeeper.ClientCnxn.(ClientCnxn.java:332)
>>> at org.apache.zookeeper.ZooKeeper.(ZooKeeper.java:383)
>>> at org.I0Itec.zkclient.ZkConnection.connect(ZkConnection.java:64)
>>> at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:872)
>>> at org.I0Itec.zkclient.ZkClient.(ZkClient.java:98)
>>> at org.I0Itec.zkclient.ZkClient.(ZkClient.java:84)
>>> at
>>>
>>> kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:152)
>>> at
>>>
>>> kafka.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:122)
>>> at
>>>
>>> kafka.javaapi.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:65)
>>> at
>>>
>>> kafka.javaapi.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:67)
>>> at
>>>
>>> kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:88)
>>> at
>>>
>>> kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)
>>> ... 5 more
>>>
>>> [2014-11-06 01:32:44,362] ERROR Kafka migration tool failed:
>>> (kafka.tools.KafkaMigrationTool)
>>> java.lang.reflect.InvocationTargetException
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>>
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>> at
>>>
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>> at
>>> kafka.tools.KafkaMigrationTool.main(KafkaMigrationTool.java:204)
>>> Caused by: java.lang.NullPointerException
>>> at org.apache.zookeeper.ClientCnxn.(ClientCnxn.java:361)
>>> at org.apache.zookeeper.ClientCnxn.(ClientCnxn.java:332)
>>> at org.apache.zookeeper.ZooKeeper.(ZooKeeper.java:383)
>>> at org.I0Itec.zkclient.ZkConnection.connect(ZkConnection.java:64)
>>> at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:872)
>>> at org.I0Itec.zkclient.ZkClient.(ZkClient.java:98)
>>> at org.I0Itec.zkclient.ZkClient.(ZkClient.java:84)
>>> at
>>>
>>> kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:152)
>>> at
>>>
>>> kafka.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:122)
>>> at
>>>
>>> kafka.javaapi.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:65)
>>> at
>>>
>>> kafka.javaapi.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:67)
>>> at
>>>
>>> kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:88)
>>> at
>>>
>>> kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)
>>> ... 5 more
>>>
>>> Using strace with it, I could see these errors again:
>>>
>>> 13342 stat("/var/migration/kafka/utils/Logging.class", 0x7fc92913c280) =
>>> -1
>>> ENOENT (No such file or directory)
>>> 13342 stat("/var/migration/kafka/common/KafkaStorageException.class",
>>> 0x7fc92913d950) = -1 ENOENT (No such file or directory)
>>> 13342 stat("/var/migration/kafka/common/KafkaException.class",
>>> 0x7fc92913d950) = -1 ENOENT (No such file or directory)
>>> 13342 st

High CPU usage of Crc32 on Kafka broker

2014-11-05 Thread Allen Wang
Hi,

Using flight recorder, we have observed high CPU usage of CRC32
(kafka.utils.Crc32.update()) on Kafka broker. It uses as much as 25% of CPU
on an instance. Tracking down stack trace, this method is invoked by
ReplicaFetcherThread.

Is there any tuning we can do to reduce this?

Also on the topic of CPU utilization, we observed that overall CPU
utilization is proportional to AllTopicsBytesInPerSec metric. Does this
metric include incoming replication traffic?

Thanks,
Allen


Re: No longer supporting Java 6, if? when?

2014-11-05 Thread Worthy LaFollette
Mostly converted now to 1.7, this would be welcomed to get any new
features.

On Wed Nov 05 2014 at 7:32:55 PM Joe Stein  wrote:

> This has been coming up in a lot of projects and for other reasons too I
> wanted to kick off the discussion about if/when we end support for Java 6.
> Besides any API we may want to use in >= 7 we also compile our binaries for
> 6 for release currently.
>
> /***
>  Joe Stein
>  Founder, Principal Consultant
>  Big Data Open Source Security LLC
>  http://www.stealth.ly
>  Twitter: @allthingshadoop 
> /
>


Re: consumer ack for high-level consumer?

2014-11-05 Thread Chia-Chun Shih
Hi,

Thanks for your response. I just read source code and found that:

  1) ConsumerIterator$next() use PartitionTopicInfo$resetConsumeOffset to
update offsets in PartitionTopicInfo objects.
  2) ZookeeperConsumerConnector$commitOffset() gets latest offsets from
PartitionTopicInfo objects, and update offsets to ZK.

So, when clients iterate through messages, offsets are updated locally
in PartitionTopicInfo
objects. When ZookeeperConsumerConnector$commitOffset is called, local
offsets are sync to ZK. Is it correct?

regards,
Chia-Chun

2014-11-06 0:24 GMT+08:00 Guozhang Wang :

> Hello,
>
> You can turn of auto.commit.offset and manually call
> connector.commitOffset() manually after you have processed the data. One
> thing to remember is that the commit frequency is related to ZK (in the
> future, Kafka) writes and hence you may not want to commit after processed
> every single message but only a batch of messages.
>
> Guozhang
>
> On Tue, Nov 4, 2014 at 10:42 PM, Chia-Chun Shih 
> wrote:
>
> > Hi,
> >
> > I am a new to Kafka. In my understanding, high-level consumer (
> > ZookeeperConsumerConnector) changes offset when message is drawn
> > by ConsumerIterator. But I would like to change offset when message is
> > processed, not when message is drawn from broker. So if a consumer dies
> > before a message is completely processed, the message will be processed
> > again. Is it possible?
> >
> > Thanks.
> >
>
>
>
> --
> -- Guozhang
>


"metric.reporters" is not working

2014-11-05 Thread Bae, Jae Hyeon
Hi

When I set up

props.put("metric.reporters",
Lists.newArrayList(ServoReporter.class.getName()));

I got the following error:

org.apache.kafka.common.config.ConfigException: Unknown configuration
'com.netflix.suro.sink.kafka.ServoReporter'
at org.apache.kafka.common.config.AbstractConfig.get(AbstractConfig.java:60)
at
org.apache.kafka.common.config.AbstractConfig.getClass(AbstractConfig.java:91)
at
org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:147)
at
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:105)
at
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:94)

AbstractConfig.getClass throws an exception because it cannot find any
definition of ServoReporter.class.getName() but I cannot add custom class
definition because the class name is not defined in the key set of
properties.

Do you have any idea?

Thank you
Best, Jae