programmatically get number of items in topic/partition

2014-10-01 Thread Shlomi Hazan
Hi,
How can I programmatically get the number of items in a topic, pending for
consumption?
If no programmatic way is avail, what other method is available?
Shlomi


Re: Created topic by 2 partitions, only can use the one partition

2014-10-01 Thread Jacky
Anyone has idea? Thank you 

Sent from my iPhone

> On Sep 30, 2014, at 1:45 PM, Jiang Jacky  wrote:
> 
> Hi, Guys
> It is very weird, I created a topic with 2 partitions couple weeks ago, and I 
> can only production the message to partition 0, not partition 1, but for now, 
> I created a new topic again with 2 partitions, it does work.
> So whats problem of the old topic? I tried to describe the old topic, I found 
> the following message
> 
> Topic:   Partition: 0Leader: 1   Replicas: 1 Isr: 
> 1
> Topic:   Partition: 1Leader: 2   Replicas: 2 Isr: 
> 2
> 
> Please let me know, if the topic is screwed up.
> 
> Thank you
> 
> 


Re: Created topic by 2 partitions, only can use the one partition

2014-10-01 Thread Joe Stein
Do you have any errors in the logs? Are you using a partition key or is
your KeyedMessage just topic and message? If your not using a partition key
then then take a look at this
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whyisdatanotevenlydistributedamongpartitionswhenapartitioningkeyisnotspecified
?

You can try using https://github.com/edenhill/kafkacat which is an
stdout/stdin producer/consumer command line tool for kafka.  And then you
can do

echo "test"|kafakcat -b yourKafkaBroker -t  -p 0
echo "test"|kafakcat -b yourKafkaBroker -t  -p 1

and see if it works trying to explicitly write to both partitions, if that
works then see above and if it doesn't work then you should get an error.

/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop 
/

On Wed, Oct 1, 2014 at 7:31 AM, Jacky  wrote:

> Anyone has idea? Thank you
>
> Sent from my iPhone
>
> > On Sep 30, 2014, at 1:45 PM, Jiang Jacky  wrote:
> >
> > Hi, Guys
> > It is very weird, I created a topic with 2 partitions couple weeks ago,
> and I can only production the message to partition 0, not partition 1, but
> for now, I created a new topic again with 2 partitions, it does work.
> > So whats problem of the old topic? I tried to describe the old topic, I
> found the following message
> >
> > Topic:   Partition: 0Leader: 1   Replicas: 1
>  Isr: 1
> > Topic:   Partition: 1Leader: 2   Replicas: 2
>  Isr: 2
> >
> > Please let me know, if the topic is screwed up.
> >
> > Thank you
> >
> >
>


Re: Zookeeper reconnect failed due to 'state changed (Expired)'

2014-10-01 Thread Andrew Otto
I understand that, but even if the leader quickly (within a few seconds) 
rejoins the cluster?  I had thought that ack=1 meant that messages not yet 
replicated from a broker that has a serious fatal failure (disk corruption, 
etc.) would be lost forever.  But, as long as broker with the unreplicated 
messages comes back into the ISR, those messages should be replicated, no?

Or, would the fact that the a new leader has been chosen for a partition, and 
new messages have been sent to that new leader, cause the offending broker to 
drop the unreplicated messages when it comes back into the ISR?


On Sep 30, 2014, at 7:17 PM, Jun Rao  wrote:

> With ack=1, acked messages could be lost when the leader fails.
> 
> Thanks,
> 
> Jun
> 
> On Mon, Sep 29, 2014 at 8:04 AM, Andrew Otto  wrote:
> 
>> This happened again to me this weekend.  I've done some sleuthing, and
>> definitely can see some crazy paging stats when this lock up happens.  For
>> the curious, more info can be found here:
>> https://bugzilla.wikimedia.org/show_bug.cgi?id=69667.  I had tuned
>> dirty_expire_centisecs from 30 seconds to 10, but this still happened
>> again.  I'll continue to troubleshoot and tune.  This slow going because it
>> is not regularly reproducible.  I have to make a single change and then
>> wait a week or two for the timeout to occur.
>> 
>> Here's a related question.  When the timeout happens, we lose some
>> messages.  Our producer is varnishkafka, which uses the librdkafka producer
>> client.  librdkafka keeps track of produce errors.  We
>> have kafka.topic.request.required.acks = 1.  According to librdkafka, all
>> messages sent have been ACKed by the leader of the partition to which the
>> messages are sent.  Also, when we lose messages due to this timeout, the
>> broker that times out is always the controller.  When it attempts to
>> reconnect to Zookeeper, we see:
>> 
>>  INFO  kafka.utils.ZkUtils$  - conflict in /controller data:
>> {"version":1,"brokerid":21,"timestamp":"1411879981756"} stored data:
>> {"version":1,"brokerid":22,"timestamp":"1407187809296"}
>> 
>> In the case when a controller drops out of the ISR for a few seconds, is it
>> possible for this confused broker to drop ACKed messages?
>> 
>> 
>> 
>> On Thu, Jul 3, 2014 at 12:48 AM, Jun Rao  wrote:
>> 
>>> Are you on Linux? We have seen this pattern (user/sys time low and real
>>> time high in GC time) before. In our case, the problem was due to disk
>>> I/Os. When there are lots of dirty pages (in our case, this is caused by
>>> log4j logging), Linux can draft user threads (in this case GC threads) to
>>> flush the dirty pages. So, all those time in real was spent on disk I/Os,
>>> rather than real GCs. The fix is to tune dirty_expire_centisecs and
>>> dirty_writeback_centisecs
>>> to flush dirty pages more frequently to avoid such drafting.
>>> 
>>> Thanks,
>>> 
>>> Jun
>>> 
>>> 
>>> On Wed, Jul 2, 2014 at 1:32 PM, Andrew Otto  wrote:
>>> 
 Hi again!
 
 I've been having this issue consistently since I first started this
>>> thread,
 but it was happening infrequently enough for me to brush it aside and
>>> just
 run an election to rebalance brokers.
 
 I recently expanded (and reinstalled) our Kafka cluster so that it now
>>> has
 4 brokers with a default replication factor of 3 for each partition.  I
 also switched over to the G1GC as recommended here:
 https://kafka.apache.org/081/ops.html (even though we are still
>> running
 Kafka 0.8.0; we hope to upgrade soon).
 
 Now, only one of the 4 brokers (analytics1021, the same problem broker
>> we
 saw before) gets its ZK connection expired even more frequently.
 Previously it was less than once a week, now I am seeing this happen
 multiple times a day.
 
 I've posted all the relevant logs from a recent event here:
 https://gist.github.com/ottomata/e42480446c627ea0af22
 
 This includes the GC log on the offending Kafka broker during the time
>>> this
 happened.  I am pretty green when it comes to GC tuning, but I do see
>>> this
 interesting stat:
 
 
 [Times: user=0.14 sys=0.00, real=11.47 secs]
 
 Did Kafka's JVM really just take 11.47 secs to do a GC there?  I'm
 probably missing something, but I don't see which part of that real
 time summary makes up the bulk of that GC time
 
 This is strange, riight?  This broker is identically configured to all
 its peers, and should be handling on average the exact same amount and
 type of traffic.  Anyone have any advice?
 
 Thanks!
 -Andrew Otto
 
 
 
 
 On Fri, Mar 21, 2014 at 6:48 PM, Neha Narkhede <
>> neha.narkh...@gmail.com>
 wrote:
 
> I see, that makes sense. Thanks a lot for clarifying!
> 
> -Neha
> 
> 
> On Fri, Mar 21, 2014 at 11:01 AM, Bae, Jae Hyeon >> 
> wrote:
> 
>> Let me clarify the situation. I forgot to mention that my case
>> migh

Re: Zookeeper reconnect failed due to 'state changed (Expired)'

2014-10-01 Thread Neha Narkhede
 But, as long as broker with the unreplicated messages comes back into the
ISR, those messages should be replicated, no?

Or, would the fact that the a new leader has been chosen for a partition,
and new messages have been sent to that new leader, cause the offending
broker to drop the unreplicated messages when it comes back into the ISR?

The moment the leader moves to another broker, the partition's source of
truth is the new broker's log and other followers truncate their logs to
follow the new leader. So, any unreplicated messages that didn't reach the
new leader are lost. If the old leader rejoins ISR, it will also truncate
it's log to follow the new leader's log.

Thanks,
Neha

On Wed, Oct 1, 2014 at 5:48 AM, Andrew Otto  wrote:

> I understand that, but even if the leader quickly (within a few seconds)
> rejoins the cluster?  I had thought that ack=1 meant that messages not yet
> replicated from a broker that has a serious fatal failure (disk corruption,
> etc.) would be lost forever.  But, as long as broker with the unreplicated
> messages comes back into the ISR, those messages should be replicated, no?
>
> Or, would the fact that the a new leader has been chosen for a partition,
> and new messages have been sent to that new leader, cause the offending
> broker to drop the unreplicated messages when it comes back into the ISR?
>
>
> On Sep 30, 2014, at 7:17 PM, Jun Rao  wrote:
>
> > With ack=1, acked messages could be lost when the leader fails.
> >
> > Thanks,
> >
> > Jun
> >
> > On Mon, Sep 29, 2014 at 8:04 AM, Andrew Otto  wrote:
> >
> >> This happened again to me this weekend.  I've done some sleuthing, and
> >> definitely can see some crazy paging stats when this lock up happens.
> For
> >> the curious, more info can be found here:
> >> https://bugzilla.wikimedia.org/show_bug.cgi?id=69667.  I had tuned
> >> dirty_expire_centisecs from 30 seconds to 10, but this still happened
> >> again.  I'll continue to troubleshoot and tune.  This slow going
> because it
> >> is not regularly reproducible.  I have to make a single change and then
> >> wait a week or two for the timeout to occur.
> >>
> >> Here's a related question.  When the timeout happens, we lose some
> >> messages.  Our producer is varnishkafka, which uses the librdkafka
> producer
> >> client.  librdkafka keeps track of produce errors.  We
> >> have kafka.topic.request.required.acks = 1.  According to librdkafka,
> all
> >> messages sent have been ACKed by the leader of the partition to which
> the
> >> messages are sent.  Also, when we lose messages due to this timeout, the
> >> broker that times out is always the controller.  When it attempts to
> >> reconnect to Zookeeper, we see:
> >>
> >>  INFO  kafka.utils.ZkUtils$  - conflict in /controller data:
> >> {"version":1,"brokerid":21,"timestamp":"1411879981756"} stored data:
> >> {"version":1,"brokerid":22,"timestamp":"1407187809296"}
> >>
> >> In the case when a controller drops out of the ISR for a few seconds,
> is it
> >> possible for this confused broker to drop ACKed messages?
> >>
> >>
> >>
> >> On Thu, Jul 3, 2014 at 12:48 AM, Jun Rao  wrote:
> >>
> >>> Are you on Linux? We have seen this pattern (user/sys time low and real
> >>> time high in GC time) before. In our case, the problem was due to disk
> >>> I/Os. When there are lots of dirty pages (in our case, this is caused
> by
> >>> log4j logging), Linux can draft user threads (in this case GC threads)
> to
> >>> flush the dirty pages. So, all those time in real was spent on disk
> I/Os,
> >>> rather than real GCs. The fix is to tune dirty_expire_centisecs and
> >>> dirty_writeback_centisecs
> >>> to flush dirty pages more frequently to avoid such drafting.
> >>>
> >>> Thanks,
> >>>
> >>> Jun
> >>>
> >>>
> >>> On Wed, Jul 2, 2014 at 1:32 PM, Andrew Otto 
> wrote:
> >>>
>  Hi again!
> 
>  I've been having this issue consistently since I first started this
> >>> thread,
>  but it was happening infrequently enough for me to brush it aside and
> >>> just
>  run an election to rebalance brokers.
> 
>  I recently expanded (and reinstalled) our Kafka cluster so that it now
> >>> has
>  4 brokers with a default replication factor of 3 for each partition.
> I
>  also switched over to the G1GC as recommended here:
>  https://kafka.apache.org/081/ops.html (even though we are still
> >> running
>  Kafka 0.8.0; we hope to upgrade soon).
> 
>  Now, only one of the 4 brokers (analytics1021, the same problem broker
> >> we
>  saw before) gets its ZK connection expired even more frequently.
>  Previously it was less than once a week, now I am seeing this happen
>  multiple times a day.
> 
>  I've posted all the relevant logs from a recent event here:
>  https://gist.github.com/ottomata/e42480446c627ea0af22
> 
>  This includes the GC log on the offending Kafka broker during the time
> >>> this
>  happened.  I am pretty green when it comes to GC tuning, b

Re: Zookeeper reconnect failed due to 'state changed (Expired)'

2014-10-01 Thread Andrew Otto
Ah!  That makes so much sense, and is likely the cause of our lost messages.

Thanks, I am now experimenting with other ack values.  I’m a little worried
about latency, especially since some of our producers send traffic across
the Atlantic (we don’t use MirrorMaker :/ ).




On Oct 1, 2014, at 10:14 AM, Neha Narkhede  wrote:

> But, as long as broker with the unreplicated messages comes back into the
> ISR, those messages should be replicated, no?
> 
> Or, would the fact that the a new leader has been chosen for a partition,
> and new messages have been sent to that new leader, cause the offending
> broker to drop the unreplicated messages when it comes back into the ISR?
> 
> The moment the leader moves to another broker, the partition's source of
> truth is the new broker's log and other followers truncate their logs to
> follow the new leader. So, any unreplicated messages that didn't reach the
> new leader are lost. If the old leader rejoins ISR, it will also truncate
> it's log to follow the new leader's log.
> 
> Thanks,
> Neha
> 
> On Wed, Oct 1, 2014 at 5:48 AM, Andrew Otto  wrote:
> 
>> I understand that, but even if the leader quickly (within a few seconds)
>> rejoins the cluster?  I had thought that ack=1 meant that messages not yet
>> replicated from a broker that has a serious fatal failure (disk corruption,
>> etc.) would be lost forever.  But, as long as broker with the unreplicated
>> messages comes back into the ISR, those messages should be replicated, no?
>> 
>> Or, would the fact that the a new leader has been chosen for a partition,
>> and new messages have been sent to that new leader, cause the offending
>> broker to drop the unreplicated messages when it comes back into the ISR?
>> 
>> 
>> On Sep 30, 2014, at 7:17 PM, Jun Rao  wrote:
>> 
>>> With ack=1, acked messages could be lost when the leader fails.
>>> 
>>> Thanks,
>>> 
>>> Jun
>>> 
>>> On Mon, Sep 29, 2014 at 8:04 AM, Andrew Otto  wrote:
>>> 
 This happened again to me this weekend.  I've done some sleuthing, and
 definitely can see some crazy paging stats when this lock up happens.
>> For
 the curious, more info can be found here:
 https://bugzilla.wikimedia.org/show_bug.cgi?id=69667.  I had tuned
 dirty_expire_centisecs from 30 seconds to 10, but this still happened
 again.  I'll continue to troubleshoot and tune.  This slow going
>> because it
 is not regularly reproducible.  I have to make a single change and then
 wait a week or two for the timeout to occur.
 
 Here's a related question.  When the timeout happens, we lose some
 messages.  Our producer is varnishkafka, which uses the librdkafka
>> producer
 client.  librdkafka keeps track of produce errors.  We
 have kafka.topic.request.required.acks = 1.  According to librdkafka,
>> all
 messages sent have been ACKed by the leader of the partition to which
>> the
 messages are sent.  Also, when we lose messages due to this timeout, the
 broker that times out is always the controller.  When it attempts to
 reconnect to Zookeeper, we see:
 
 INFO  kafka.utils.ZkUtils$  - conflict in /controller data:
 {"version":1,"brokerid":21,"timestamp":"1411879981756"} stored data:
 {"version":1,"brokerid":22,"timestamp":"1407187809296"}
 
 In the case when a controller drops out of the ISR for a few seconds,
>> is it
 possible for this confused broker to drop ACKed messages?
 
 
 
 On Thu, Jul 3, 2014 at 12:48 AM, Jun Rao  wrote:
 
> Are you on Linux? We have seen this pattern (user/sys time low and real
> time high in GC time) before. In our case, the problem was due to disk
> I/Os. When there are lots of dirty pages (in our case, this is caused
>> by
> log4j logging), Linux can draft user threads (in this case GC threads)
>> to
> flush the dirty pages. So, all those time in real was spent on disk
>> I/Os,
> rather than real GCs. The fix is to tune dirty_expire_centisecs and
> dirty_writeback_centisecs
> to flush dirty pages more frequently to avoid such drafting.
> 
> Thanks,
> 
> Jun
> 
> 
> On Wed, Jul 2, 2014 at 1:32 PM, Andrew Otto 
>> wrote:
> 
>> Hi again!
>> 
>> I've been having this issue consistently since I first started this
> thread,
>> but it was happening infrequently enough for me to brush it aside and
> just
>> run an election to rebalance brokers.
>> 
>> I recently expanded (and reinstalled) our Kafka cluster so that it now
> has
>> 4 brokers with a default replication factor of 3 for each partition.
>> I
>> also switched over to the G1GC as recommended here:
>> https://kafka.apache.org/081/ops.html (even though we are still
 running
>> Kafka 0.8.0; we hope to upgrade soon).
>> 
>> Now, only one of the 4 brokers (analytics1021, the same problem broker
 we
>> saw before) gets its ZK connection expired even more fre

Re: LeaderNotAvailableException, although leader elected

2014-10-01 Thread Neha Narkhede
Andras,

Thanks for your feedback!

In my opinion programmatic message sending must work out of the box on the
first try, without any exceptions, warnings or the need for additional
configuration.

I'd be glad to support/contribute.

I agree that the behavior of the producer for the first message on a topic
is awkward and I'd encourage feedback from you. We certainly are interested
in improving user experience. Could you please file a JIRA so we can
discuss alternatives there?

Thanks,
Neha

On Sat, Sep 27, 2014 at 12:58 AM, Andras Hatvani <
andras.hatv...@andrashatvani.com> wrote:

> AFAIK not topics, but only partitions of topics have leaders. What
> controllers do you mean? I haven't read about such.
>
> Thanks for the explanation regarding the metadata request, in the meantime
> I found out that this is an expected (!) failure (
> http://qnalist.com/questions/4787268/kafka-common-leadernotavailableexception
> ).
>
> For me this is isn't an acceptable way to communicate that the leader
> election is in progress. There is not a single hint to this fact, but only
> an exception.
> If this is an expected behavior, then it not only mustn't be an exception,
> but it also has to be communicated that there is something in progress.
> Furthermore, suggestions regarding changing the values variables I
> mentioned in my solution should be mandatory.
>
> This was my case:
> - OK, let's use Kafka
> - Create an infrastructure
> - Create a programmatic producer
> - Send a message
> - Message sending fails.
> - Retry
> - Message sending works!
> - Look for answers on the internet and in the docs
> - Read the configuration
> - Play around with configuration values.
>
> This is bad user experience especially for a newbie and involves a lot of
> effort.
>
> In my opinion programmatic message sending must work out of the box on the
> first try, without any exceptions, warnings or the need for additional
> configuration.
>
> I'd be glad to support/contribute.
>
> Regards,
> Andras
>
> > On 26 Sep 2014, at 19:53, Joel Koshy  wrote:
> >
> >>> kafka2_1  | [2014-09-26 12:35:07,289] INFO [Kafka Server 2],
> started (kafka.server.KafkaServer)
> >>> kafka2_1  | [2014-09-26 12:35:07,394] INFO New leader is 2
> (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
> >
> > The above logs are for controller election. Not leader election for
> > the topic you are producing to.
> >
> > When you producer starts it will issue a metadata request and it
> > should auto-create the topic (provided auto-create is on which is
> > default). The first metadata request for a non-existent topic always
> > returns LeaderNotAvailable because the controller then has to elect a
> > leader for the new topic.
> >
> > Joel
> >
> > On Fri, Sep 26, 2014 at 04:07:58PM +0200, Andras Hatvani wrote:
> >> And the solution was:
> >> Increase retry.backoff.ms from the default 100 ms to 1000 ms, so the
> output is:
> >>
> >> 11891 [main] INFO  kafka.client.ClientUtils$ - Fetching metadata from
> broker id:0,host:192.168.59.103,port:9092 with correlation id 0 for 1
> topic(s) Set(inputTopic)
> >> 11893 [main] INFO  kafka.producer.SyncProducer - Connected to
> 192.168.59.103:9092 for producing
> >> 12045 [main] INFO  kafka.producer.SyncProducer - Disconnecting from
> 192.168.59.103:9092
> >> 12062 [main] WARN  kafka.producer.BrokerPartitionInfo - Error while
> fetching metadata [{TopicMetadata for topic inputTopic ->
> >> No partition metadata for topic inputTopic due to
> kafka.common.LeaderNotAvailableException}] for topic [inputTopic]: class
> kafka.common.LeaderNotAvailableException
> >> 12066 [main] INFO  kafka.client.ClientUtils$ - Fetching metadata from
> broker id:0,host:192.168.59.103,port:9092 with correlation id 1 for 1
> topic(s) Set(inputTopic)
> >> 12067 [main] INFO  kafka.producer.SyncProducer - Connected to
> 192.168.59.103:9092 for producing
> >> 12097 [main] INFO  kafka.producer.SyncProducer - Disconnecting from
> 192.168.59.103:9092
> >> 12098 [main] WARN  kafka.producer.BrokerPartitionInfo - Error while
> fetching metadata [{TopicMetadata for topic inputTopic ->
> >> No partition metadata for topic inputTopic due to
> kafka.common.LeaderNotAvailableException}] for topic [inputTopic]: class
> kafka.common.LeaderNotAvailableException
> >> 12098 [main] ERROR kafka.producer.async.DefaultEventHandler - Failed to
> collate messages by topic, partition due to: Failed to fetch topic metadata
> for topic: inputTopic
> >> 12099 [main] INFO  kafka.producer.async.DefaultEventHandler - Back off
> for 1000 ms before retrying send. Remaining retries = 3
> >> 13104 [main] INFO  kafka.client.ClientUtils$ - Fetching metadata from
> broker id:0,host:192.168.59.103,port:9092 with correlation id 2 for 1
> topic(s) Set(inputTopic)
> >> 13111 [main] INFO  kafka.producer.SyncProducer - Connected to
> 192.168.59.103:9092 for producing
> >> 13137 [main] INFO  kafka.producer.SyncProducer - Disconnecting from
> 192.168.59.103:9092
> >> 13161 [main] INFO

Re: Connection reset by peer

2014-10-01 Thread Neha Narkhede
Also, in a comment on this thread you mentioned that this is an expected
exception

This is expected during shutdown of a client since the server's attempts at
sending any outstanding responses fails. This happens since the other
endpoint of the socket connection is dead (the client).

On Thu, Sep 25, 2014 at 11:57 AM, Aniket Kulkarni <
kulkarnianiket...@gmail.com> wrote:

> Hello Neha,
>
> I am trying to run some tests which use Kafka 0.8.1.1. The tests do not
> fail but give out a warning messages which I am trying to get rid off such
> as :
>
> 2014-09-25 11:43:03,572 [kafka-processor-56598-1] ERROR
> kafka.network.Processor - Closing socket for /127.0.0.1 because of error
>
> java.io.IOException: Broken pipe
>
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:487)
>
> at kafka.api.PartitionDataSend.writeTo(FetchResponse.scala:67)
>
> at kafka.network.MultiSend.writeTo(Transmission.scala:101)
>
> at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:124)
>
> at kafka.network.MultiSend.writeTo(Transmission.scala:101)
>
> at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:219)
>
> at kafka.network.Processor.write(SocketServer.scala:375)
>
> at kafka.network.Processor.run(SocketServer.scala:247)
>
> at java.lang.Thread.run(Thread.java:744)
>
> Also,
>
> 2014-09-25 11:43:53,770 [kafka-processor-56598-1] ERROR
> kafka.network.Processor - Closing socket for /127.0.0.1 because of error
>
> java.io.IOException: Connection reset by peer
>
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
>
> at sun.nio.ch.IOUtil.read(IOUtil.java:197)
>
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
>
> at kafka.utils.Utils$.read(Utils.scala:375)
>
> at
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>
> at kafka.network.Processor.read(SocketServer.scala:347)
>
> at kafka.network.Processor.run(SocketServer.scala:245)
>
>  at java.lang.Thread.run(Thread.java:744)
>
>
> I tried to debug the producer and consumer I am using but it turns out this
> exception is thrown when I try to close the producer or consumer. I tried a
> couple of approaches as mentioned in a few threads I saw online :
>
> 1.) I tried to increase the fetch.wait.max.ms and socket.timeout.ms for
> the
> consumer but that didn't work
>
> 2.) A couple of threads suggested firewall issues. Since I am running the
> tests locally, firewall issues are irrelevant since the broker is not
> remote.
>
> 3.) I tried to catch the IOException in the tests which threw the above
> mentioned exceptions but I was not able to catch them. This led me to
> believe that the exceptions are not occurring on the producer and/or
> consumer but on the broker. I think the broker seems to be closing open or
> stray connections, which are giving out those exceptions. I also confirmed
> that all my producer and consumer tests are closing connections to the
> broker as soon as the test is executed.
>
> Also, in a comment on this thread you mentioned that this is an expected
> exception. Does that mean there will be a fix out for this?
>
> Does this issue refer to the fix?
>
> [1] - https://issues.apache.org/jira/browse/KAFKA-270
>
> Thanks a lot for your help.
>
> --
> Aniket Kulkarni.
>


RE: BadVersion state in Kafka Logs

2014-10-01 Thread Seshadri, Balaji
We don’t have GC problem.

zookeeper runs at 0% GC
Kafka broker runs at 1-3% GC

We don’t see many major GC's in our monitoring. We use Concurrent Mark and 
Sweep GC.

-Original Message-
From: Joe Stein [mailto:joe.st...@stealth.ly]
Sent: Tuesday, September 30, 2014 6:41 PM
To: users@kafka.apache.org
Subject: Re: BadVersion state in Kafka Logs

Also check for really long/bad GC pauses as another possibility. Not sure your 
JDK and JVM_OPTS and if you are setting like this 
https://kafka.apache.org/documentation.html#java or not. You need to find some 
"spike" somewhere right before that error happens to track down what is causing 
the timeouts.

On Tue, Sep 30, 2014 at 6:33 PM, Joe Stein  wrote:

> It sounds like you have a much deeper rooted problem.  Is zookeeper
> swapping?  Something has to be causing this.  After you fix this
> symptom you will probably start to see constant leader elections and
> the isr shrinking/growing and constant consumer rebalancing (or at
> least every
> minute) and a herd affect up/down stream occuring.  You need to figure
> out what is causing the long session timeout and resolve that, IMHO.
> Zookeeper health is the first place to look.  Next would be the network.
>
> /***
>  Joe Stein
>  Founder, Principal Consultant
>  Big Data Open Source Security LLC
>  http://www.stealth.ly
>  Twitter: @allthingshadoop 
> /
>
> On Tue, Sep 30, 2014 at 4:57 PM, Seshadri, Balaji <
> balaji.sesha...@dish.com> wrote:
>
>> The zookeeper session timeout is 60 secs ,but that did not help.
>>
>> We are having broker crash and unresponsive, we got the "conditional
>> update" failed error when broker crashed which confirmed that it is
>> because of KAFKA-1382.
>>
>> server.log.2014-09-23:2014-09-23 13:54:48 ERROR utils.ZkUtils$ -
>> Conditional update of path
>> /brokers/topics/dish-promo-application-access/partitions/128/state
>> with data { "controller_epoch":40, "isr":[ 6, 1 ], "leader":1,
>> "leader_epoch":99, "version":1 } and expected version 150 failed due
>> to
>> org.apache.zookeeper.KeeperException$BadVersionException:
>> KeeperErrorCode = BadVersion for
>> /brokers/topics/dish-promo-application-access/partitions/128/state
>>
>> We are in very old version 0.8-beta so it's not just patch but
>> switching to stable release version which also has the patch.
>>
>> -Original Message-
>> From: Joe Stein [mailto:joe.st...@stealth.ly]
>> Sent: Tuesday, September 30, 2014 2:01 PM
>> To: users@kafka.apache.org
>> Cc: Neha Narkhede
>> Subject: Re: BadVersion state in Kafka Logs
>>
>> Have you tried increasing your broker's zookeeper session timeout as
>> a work around for now to alleviate the issue?  Is that an option for you?
>> Assuming that is the culprit you are timing zk sessions out and
>> bumping into
>> KAFKA-1382 on the reconnect? Not knowing enough about what is going
>> on with the cluster it is hard to say if anything negative will come
>> from it but seems like it might be a an approach to try... if you can
>> figure out what is causing the session to timeout and fix *that* it
>> would be a solution also if it is happening every couple days (as
>> another email thread
>> states) something is going on that may not just be fixed by a single
>> patch.
>>
>> /***
>>  Joe Stein
>>  Founder, Principal Consultant
>>  Big Data Open Source Security LLC
>>  http://www.stealth.ly
>>  Twitter: @allthingshadoop 
>> /
>>
>> On Tue, Sep 30, 2014 at 11:49 AM, Seshadri, Balaji <
>> balaji.sesha...@dish.com
>> > wrote:
>>
>> > Hi Joe,
>> >
>> > I did not try on 0.8.1 branch ,I can try and see if it goes through
>> > when I get some breather.
>> >
>> > Thanks for initiating on 0.8.1.2.
>> >
>> > Thanks,
>> >
>> > Balaji
>> >
>> > -Original Message-
>> > From: Joe Stein [mailto:joe.st...@stealth.ly]
>> > Sent: Tuesday, September 30, 2014 9:34 AM
>> > To: users@kafka.apache.org
>> > Cc: Neha Narkhede
>> > Subject: Re: BadVersion state in Kafka Logs
>> >
>> > Does the patch in KAFKA-1382 apply on the 0.8.1 branch?  If not if
>> > you could make a patch that does would be great.
>> >
>> > I will kick off a discussion for KAFKA-1382 and the scala 2.11 for
>> > 0.8.1.2 release (and see what others may think we should do like
>> > the gradle changes I think we should do too for src release issues
>> > (and the jars in the repo)).  I will send that on dev/user in a
>> > little bit (please comment +1 community support please on that
>> > thread for the
>> release).
>> >
>> > /***
>> >  Joe Stein
>> >  Founder, Principal Consultant
>> >  Big Data Open Source Security LLC
>> >  http://www.stealth.ly
>> >  Twitter: @allthingshadoop 
>> > ***

Re: programmatically get number of items in topic/partition

2014-10-01 Thread Gwen Shapira
Take a look at ConsumerOffsetChecker. It does just that: print the
offset and lag for each consumer and partition.

You can either use that class directly, or use it as a guideline for
your implementation

On Wed, Oct 1, 2014 at 2:10 AM, Shlomi Hazan  wrote:
> Hi,
> How can I programmatically get the number of items in a topic, pending for
> consumption?
> If no programmatic way is avail, what other method is available?
> Shlomi


Re: programmatically get number of items in topic/partition

2014-10-01 Thread chetan conikee
The other method is via command line

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group *groupName*
--zkconnect *zkServer:2181*

Refer :
https://cwiki.apache.org/confluence/display/KAFKA/System+Tools#SystemTools-ConsumerOffsetChecker

https://apache.googlesource.com/kafka/+/0.8.0-beta1-candidate1/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala

On Wed, Oct 1, 2014 at 8:28 AM, Gwen Shapira  wrote:

> Take a look at ConsumerOffsetChecker. It does just that: print the
> offset and lag for each consumer and partition.
>
> You can either use that class directly, or use it as a guideline for
> your implementation
>
> On Wed, Oct 1, 2014 at 2:10 AM, Shlomi Hazan  wrote:
> > Hi,
> > How can I programmatically get the number of items in a topic, pending
> for
> > consumption?
> > If no programmatic way is avail, what other method is available?
> > Shlomi
>


map reduce Outputformat KafkaOutputFormat

2014-10-01 Thread Abraham Jacob
Hi All,

After a map reduce computation I would like to send the results out to the
a Kafka queue. Is there a bridge available that will let me publish the
output to a Kafka queue.

Similar to the TextOutputFormat... maybe a KafkaOutputFormat...

I found one source
https://github.com/kafka-dev/kafka/tree/master/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop

But was still wondering about experience from others regarding this topic.


Thanks,
-abe


-- 
~


Reassigning Partition Failing

2014-10-01 Thread Lung, Paul
Hi All,

I had a 0.8.1.1 Kafka Broker go down, and I was trying to use the reassign 
partition script to move topics off that broker. When I describe the topics, I 
see the following:

Topic: mini__022active_120__33__mini Partition: 0 Leader: 2131118 Replicas: 
2131118,2166601,2163421 Isr: 2131118,2166601

This shows that the broker “2163421” is down. So I create the following file 
/tmp/move_topic.json:
{
"version": 1,
"partitions": [
{
"topic": "mini__022active_120__33__mini",
"partition": 0,
"replicas": [
2131118, 2166601,  2156998
]
}
]
}

And then do this:

./kafka-reassign-partitions.sh --execute --reassignment-json-file 
/tmp/move_topic.json
Successfully started reassignment of partitions 
{"version":1,"partitions":[{"topic":"mini__022active_120__33__mini","partition":0,"replicas":[2131118,2166601,2156998]}]}

However, when I try to verify this, I get the following error:
./kafka-reassign-partitions.sh --verify --reassignment-json-file 
/tmp/move_topic.json
Status of partition reassignment:
ERROR: Assigned replicas (2131118,2166601,2156998,2163421) don't match the list 
of replicas for reassignment (2131118,2166601,2156998) for partition 
[mini__022active_120__33__mini,0]
Reassignment of partition [mini__022active_120__33__mini,0] failed

If I describe the topics, I now see there are 4 replicas. This has been like 
this for many hours now, so it seems to have permanently moved to 4 replicas 
for some reason.
Topic:mini__022active_120__33__mini PartitionCount:1 ReplicationFactor:4 
Configs:
Topic: mini__022active_120__33__mini Partition: 0 Leader: 2131118 Replicas: 
2131118,2166601,2156998,2163421 Isr: 2131118,2166601

If I re-execute and re-verify, I get the same error. So it seems to be wedged.

Can someone help?

Paul Lung




kafka producer performance test

2014-10-01 Thread Sa Li
Hi, All

I built a 3-node kafka cluster, I want to make performance test, I found 
someone post following thread, that is exactly the problem I have:
- 
While testing kafka producer performance, I found 2 testing scripts.

1) performance testing script in kafka distribution

bin/kafka-producer-perf-test.sh --broker-list localhost:9092 --messages
1000 --topic test --threads 10 --message-size 100 --batch-size 1
--compression-codec 1

2) performance testing script mentioned in

https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines

bin/kafka-run-class.sh
org.apache.kafka.clients.tools.ProducerPerformance test6 5000 100
-1 acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092
buffer.memory=67108864 batch.size=8196

based on org.apache.kafka.clients.producer.Producer.

——


I was unable to duplicate either of above method, I figure the commands are 
outdated, anyone point me how to do such test with new command?


thanks

Alec

Re: kafka producer performance test

2014-10-01 Thread ravi singh
It is available with Kafka package  containing the source code. Download
the package, build it and run the above command.

Regards,
Ravi

On Wed, Oct 1, 2014 at 7:55 PM, Sa Li  wrote:

> Hi, All
>
> I built a 3-node kafka cluster, I want to make performance test, I found
> someone post following thread, that is exactly the problem I have:
> -
> While testing kafka producer performance, I found 2 testing scripts.
>
> 1) performance testing script in kafka distribution
>
> bin/kafka-producer-perf-test.sh --broker-list localhost:9092 --messages
> 1000 --topic test --threads 10 --message-size 100 --batch-size 1
> --compression-codec 1
>
> 2) performance testing script mentioned in
>
>
> https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
>
> bin/kafka-run-class.sh
> org.apache.kafka.clients.tools.ProducerPerformance test6 5000 100
> -1 acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092
> buffer.memory=67108864 batch.size=8196
>
> based on org.apache.kafka.clients.producer.Producer.
>
> ——
>
>
> I was unable to duplicate either of above method, I figure the commands
> are outdated, anyone point me how to do such test with new command?
>
>
> thanks
>
> Alec




-- 
*Regards,*
*Ravi*


Re: kafka producer performance test

2014-10-01 Thread Sa Li
Hi, Ravi

Thanks for reply, this is how I build the kafka package 0.8

$ git clone https://git-wip-us.apache.org/repos/asf/kafka.git
$ cd /etc/kafka
$ git checkout -b 0.8 remotes/origin/0.8
$ ./sbt update
$ ./sbt package
$ ./sbt assembly-package-dependency

So I believe I already build it, but still not able to run it, any clues for 
that?

thanks

Alec

On Oct 1, 2014, at 9:13 PM, ravi singh  wrote:

> It is available with Kafka package  containing the source code. Download
> the package, build it and run the above command.
> 
> Regards,
> Ravi
> 
> On Wed, Oct 1, 2014 at 7:55 PM, Sa Li  wrote:
> 
>> Hi, All
>> 
>> I built a 3-node kafka cluster, I want to make performance test, I found
>> someone post following thread, that is exactly the problem I have:
>> -
>> While testing kafka producer performance, I found 2 testing scripts.
>> 
>> 1) performance testing script in kafka distribution
>> 
>> bin/kafka-producer-perf-test.sh --broker-list localhost:9092 --messages
>> 1000 --topic test --threads 10 --message-size 100 --batch-size 1
>> --compression-codec 1
>> 
>> 2) performance testing script mentioned in
>> 
>> 
>> https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
>> 
>> bin/kafka-run-class.sh
>> org.apache.kafka.clients.tools.ProducerPerformance test6 5000 100
>> -1 acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092
>> buffer.memory=67108864 batch.size=8196
>> 
>> based on org.apache.kafka.clients.producer.Producer.
>> 
>> ——
>> 
>> 
>> I was unable to duplicate either of above method, I figure the commands
>> are outdated, anyone point me how to do such test with new command?
>> 
>> 
>> thanks
>> 
>> Alec
> 
> 
> 
> 
> -- 
> *Regards,*
> *Ravi*



Re: kafka producer performance test

2014-10-01 Thread Jay Kreps
Hi Sa,

That script was developed with the new producer that is included on
trunk. Checkout trunk and build and it should be there.

-Jay

On Wed, Oct 1, 2014 at 7:55 PM, Sa Li  wrote:
> Hi, All
>
> I built a 3-node kafka cluster, I want to make performance test, I found 
> someone post following thread, that is exactly the problem I have:
> -
> While testing kafka producer performance, I found 2 testing scripts.
>
> 1) performance testing script in kafka distribution
>
> bin/kafka-producer-perf-test.sh --broker-list localhost:9092 --messages
> 1000 --topic test --threads 10 --message-size 100 --batch-size 1
> --compression-codec 1
>
> 2) performance testing script mentioned in
>
> https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
>
> bin/kafka-run-class.sh
> org.apache.kafka.clients.tools.ProducerPerformance test6 5000 100
> -1 acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092
> buffer.memory=67108864 batch.size=8196
>
> based on org.apache.kafka.clients.producer.Producer.
>
> ——
>
>
> I was unable to duplicate either of above method, I figure the commands are 
> outdated, anyone point me how to do such test with new command?
>
>
> thanks
>
> Alec