RE: Reg Broker information.

2014-06-26 Thread Balasubramanian Jayaraman
Hi,

I used the command (bin/zookeeper-shell.sh 0.0.0.0:2181 get /brokers/ids/0) to 
get the broker metadata information. 
How do I get the list of brokers attached to a zookeeper ? 

Is my question clear ?

Thanks
Bala
-Original Message-
From: Neha Narkhede [mailto:neha.narkh...@gmail.com] 
Sent: Thursday, June 26, 2014 1:20 PM
To: users@kafka.apache.org
Subject: Re: Reg Broker information.

I'm not sure I understood your question. If you want to know all registered 
brokers, could you list the broker ids "ls /brokers/ids" and then read each of 
the returned children nodes?



On Wed, Jun 25, 2014 at 7:29 PM, Balasubramanian Jayaraman < 
balasubramanian.jayara...@autodesk.com> wrote:

> Hi,
>
> Currently we have the command (/brokers/ids/0) to get the individual  
> broker registration information.
> How do I get all the registered brokers from the zookeeper.
>
> Thanks
> Bala
>
>


Re: Kafka 0.8's VerifyConsumerRebalance reports an error

2014-06-26 Thread Yury Ruchin
I have set log level to DEBUG and saw something strange in the output. For
each topic partition, I see the following pattern:

[2014-06-26 16:00:24,467] ERROR No owner for partition [,0]
(kafka.tools.VerifyConsumerRebalance$)

...

[2014-06-26 16:00:24,469] INFO Owner of partition [,0] is
_-140304960-abc12345-0
(kafka.tools.VerifyConsumerRebalance$)

As I understand VerifyConsumerRebalance.scala, those 2 messages should be
mutually exclusive, but their both appear for every partition. With the
default log settings, only that with ERROR level is shown to user.

Is this a problem with the tool?

2014-06-25 2:04 GMT+04:00 Neha Narkhede :

> I would turn on DEBUG on the tool to see which url it reads and doesn't
> find the owners.
>
>
>
>
> On Tue, Jun 24, 2014 at 11:28 AM, Yury Ruchin 
> wrote:
>
> > I've just double-checked. The URL is correct, the same one is used by
> Kafka
> > clients.
> >
> >
> > 2014-06-24 22:21 GMT+04:00 Neha Narkhede :
> >
> > > Is it possible that maybe the zookeeper url used for the
> > > VerifyConsumerRebalance tool is incorrect?
> > >
> > >
> > > On Tue, Jun 24, 2014 at 12:02 AM, Yury Ruchin 
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I've run into the following problem. I try to read from a
> 50-partition
> > > > Kafka topic using high level consumer with 8 streams. I'm using
> > 8-thread
> > > > pool, each thread handling one stream. After a short time, the
> threads
> > > > reading from the stream stop reading. Lag between topic latest offset
> > and
> > > > the consumer constantly increases as new messages come in.
> > > >
> > > > I looked into ZK:  /consumers//owners/ and
> > > see a
> > > > list of znodes corresponding to the full list of partitions: [1, 2,
> 3,
> > > > ...]. When I do zk get on e. g.
> > > > /consumers//owners//1 - I see a valid
> > consumer
> > > > name corresponding to Kafka client logs, e. g.
> > > > _-140304960-abc12345-0. However, when I
> > run
> > > > the VerifyConsumerRebalance tool, I see the following:
> > > >
> > > > No owner for partition [,1]
> > > > (kafka.tools.VerifyConsumerRebalance$)
> > > >
> > > > No owner for partition [,2]
> > > > (kafka.tools.VerifyConsumerRebalance$)
> > > >
> > > > ...
> > > >
> > > > No owner for partition [,50]
> > > > (kafka.tools.VerifyConsumerRebalance$)
> > > >
> > > > According to this output, no partition has owner, which seemingly
> > > > contradicts to what I see in ZK.
> > > >
> > > > What would cause such a problem and how can I troubleshoot it
> further?
> > > >
> > > > Thanks!
> > > >
> > >
> >
>


Re: Reg Broker information.

2014-06-26 Thread Yury Ruchin
In ZK shell the following command:

ls /brokers/ids

will give you a list like this:

[0, 1, 2, 3, 4]

where items are broker ids you can further use to issue "get" request to ZK:

get /brokers/ids/



2014-06-26 12:37 GMT+04:00 Balasubramanian Jayaraman <
balasubramanian.jayara...@autodesk.com>:

> Hi,
>
> I used the command (bin/zookeeper-shell.sh 0.0.0.0:2181 get
> /brokers/ids/0) to get the broker metadata information.
> How do I get the list of brokers attached to a zookeeper ?
>
> Is my question clear ?
>
> Thanks
> Bala
> -Original Message-
> From: Neha Narkhede [mailto:neha.narkh...@gmail.com]
> Sent: Thursday, June 26, 2014 1:20 PM
> To: users@kafka.apache.org
> Subject: Re: Reg Broker information.
>
> I'm not sure I understood your question. If you want to know all
> registered brokers, could you list the broker ids "ls /brokers/ids" and
> then read each of the returned children nodes?
>
>
>
> On Wed, Jun 25, 2014 at 7:29 PM, Balasubramanian Jayaraman <
> balasubramanian.jayara...@autodesk.com> wrote:
>
> > Hi,
> >
> > Currently we have the command (/brokers/ids/0) to get the individual
> > broker registration information.
> > How do I get all the registered brokers from the zookeeper.
> >
> > Thanks
> > Bala
> >
> >
>


do apps with producers have to be restarted if cluster goes down and comes back up?

2014-06-26 Thread S Ahmed
Hi,

A few questions on timing related issues when certain parts of kafka go
down.

1.  If zookeeper goes down, then I bring it back online, do I have to
restart the brokers?
2.  If the brokers go down, producers will be erroring out.  When the
brokers are back online, do I have to restart the processes with producers?

3. When servers are restarted, how can you gaurantee that first the
zookeeper server come online, THEN the brokers, and THEN the webapp's with
the producers?   Or is the timing not that strict because of e.g. timeout
re-connect durations?

Thanks.


Re: Kafka 0.8's VerifyConsumerRebalance reports an error

2014-06-26 Thread Neha Narkhede
This is a bug in the tool. Please file a bug and attach these error/info
logs to it.

Thanks,
Neha


On Thu, Jun 26, 2014 at 5:24 AM, Yury Ruchin  wrote:

> I have set log level to DEBUG and saw something strange in the output. For
> each topic partition, I see the following pattern:
>
> [2014-06-26 16:00:24,467] ERROR No owner for partition [,0]
> (kafka.tools.VerifyConsumerRebalance$)
>
> ...
>
> [2014-06-26 16:00:24,469] INFO Owner of partition [,0] is
> _-140304960-abc12345-0
> (kafka.tools.VerifyConsumerRebalance$)
>
> As I understand VerifyConsumerRebalance.scala, those 2 messages should be
> mutually exclusive, but their both appear for every partition. With the
> default log settings, only that with ERROR level is shown to user.
>
> Is this a problem with the tool?
>
> 2014-06-25 2:04 GMT+04:00 Neha Narkhede :
>
> > I would turn on DEBUG on the tool to see which url it reads and doesn't
> > find the owners.
> >
> >
> >
> >
> > On Tue, Jun 24, 2014 at 11:28 AM, Yury Ruchin 
> > wrote:
> >
> > > I've just double-checked. The URL is correct, the same one is used by
> > Kafka
> > > clients.
> > >
> > >
> > > 2014-06-24 22:21 GMT+04:00 Neha Narkhede :
> > >
> > > > Is it possible that maybe the zookeeper url used for the
> > > > VerifyConsumerRebalance tool is incorrect?
> > > >
> > > >
> > > > On Tue, Jun 24, 2014 at 12:02 AM, Yury Ruchin  >
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > I've run into the following problem. I try to read from a
> > 50-partition
> > > > > Kafka topic using high level consumer with 8 streams. I'm using
> > > 8-thread
> > > > > pool, each thread handling one stream. After a short time, the
> > threads
> > > > > reading from the stream stop reading. Lag between topic latest
> offset
> > > and
> > > > > the consumer constantly increases as new messages come in.
> > > > >
> > > > > I looked into ZK:  /consumers//owners/
> and
> > > > see a
> > > > > list of znodes corresponding to the full list of partitions: [1, 2,
> > 3,
> > > > > ...]. When I do zk get on e. g.
> > > > > /consumers//owners//1 - I see a valid
> > > consumer
> > > > > name corresponding to Kafka client logs, e. g.
> > > > > _-140304960-abc12345-0. However,
> when I
> > > run
> > > > > the VerifyConsumerRebalance tool, I see the following:
> > > > >
> > > > > No owner for partition [,1]
> > > > > (kafka.tools.VerifyConsumerRebalance$)
> > > > >
> > > > > No owner for partition [,2]
> > > > > (kafka.tools.VerifyConsumerRebalance$)
> > > > >
> > > > > ...
> > > > >
> > > > > No owner for partition [,50]
> > > > > (kafka.tools.VerifyConsumerRebalance$)
> > > > >
> > > > > According to this output, no partition has owner, which seemingly
> > > > > contradicts to what I see in ZK.
> > > > >
> > > > > What would cause such a problem and how can I troubleshoot it
> > further?
> > > > >
> > > > > Thanks!
> > > > >
> > > >
> > >
> >
>


Re: Kafka connection loss with high volume of messages

2014-06-26 Thread Ahmed H.
I dug some more and it seems like before these errors show up, I see a few
Zookeeper warnings, followed by Kafka errors.

11:57:26,897 INFO  [org.I0Itec.zkclient.ZkClient]
(pool-5-thread-1-EventThread) zookeeper state changed (Disconnected)
11:57:26,897 INFO  [org.I0Itec.zkclient.ZkClient]
(clojure-agent-send-off-pool-6-EventThread) zookeeper state changed
(Disconnected)
11:57:26,898 INFO  [kafka.consumer.SimpleConsumer]
(ConsumerFetcherThread-test-queue.default_localhost-1403795897237-bf49b4a5-0-0)
Reconnect due to socket error: :
java.nio.channels.ClosedByInterruptException
 at
java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
[rt.jar:1.7.0_25]
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:402)
[rt.jar:1.7.0_25]
 at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:220)
[rt.jar:1.7.0_25]
at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
[rt.jar:1.7.0_25]
 at
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
[rt.jar:1.7.0_25]
at kafka.utils.Utils$.read(Utils.scala:394)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
 at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
 at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
 at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:71)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
 at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:108)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
 at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:108)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:108)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
 at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:107)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
 at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:107)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:107)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
 at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:106)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
 at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
at
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]


Again, this starts happening towards the end of my large message sending
process.

Thanks


On Wed, Jun 25, 2014 at 6:02 PM, Neha Narkhede 
wrote:

> If rebalance succeeded, then those error messages are harmless. Though I
> agree we shouldn't log those in the first place.
>
>
> On Wed, Jun 25, 2014 at 2:12 PM, Ahmed H.  wrote:
>
> > Unfortunately I do not have the logs on hand anymore, they were cleared
> > already.
> >
> > With that said, I do recall seeing some rebalancing. It attempts to
> > rebalance a few times and eventually succeeds. In the past, I have had
> > cases where it tries rebalancing 4 times and gives up because it reached
> > it's limit. In this particular situation, it didn't totally fail.
> >
> >
> > On Wed, Jun 25, 2014 at 2:44 PM, Neha Narkhede 
> > wrote:
> >
> > > Do you see something like "begin rebalancing consumer" in your consumer
> > > logs? Could you send around the full log4j of the consumer?
> > >
> > >
> > > On Wed, Jun 25, 2014 at 8:19 AM, Ahmed H. 
> > wrote:
> > >
> > > > Are you referring to the zookeeper logs? If so, I am seeing a lot of
> > > those:
> > > >
> > > > 2014-06-25 11:15:02 NIOServerCnxn [WARN] caught end of stream
> exception
> > > > EndOfStreamException: Unable to read additional data from client
> > > sessionid
> > > > 0x146958701700371, likely client has closed socket
> > > >  at
> > > org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:220)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.zookeeper.server.

Re: do apps with producers have to be restarted if cluster goes down and comes back up?

2014-06-26 Thread Neha Narkhede
1.  If zookeeper goes down, then I bring it back online, do I have to
restart the brokers?

You shouldn't have to restart the brokers if the broker side config "
zookeeper.connection.timeout.ms" is tuned such that zookeeper is brought
back online within the configured timeout.

2.  If the brokers go down, producers will be erroring out.  When the
brokers are back online, do I have to restart the processes with producers?

This depends on the number of retries configured on the producer. For the
new producer, this config is "retries".

3. When servers are restarted, how can you gaurantee that first the
zookeeper server come online, THEN the brokers, and THEN the webapp's with
the producers?   Or is the timing not that strict because of e.g. timeout
re-connect durations?

Ideally, there shouldn't be a timing issue if the config is right. However,
if you do see an issue, please report it.

Thanks,
Neha


On Thu, Jun 26, 2014 at 7:36 AM, S Ahmed  wrote:

> Hi,
>
> A few questions on timing related issues when certain parts of kafka go
> down.
>
> 1.  If zookeeper goes down, then I bring it back online, do I have to
> restart the brokers?
> 2.  If the brokers go down, producers will be erroring out.  When the
> brokers are back online, do I have to restart the processes with producers?
>
> 3. When servers are restarted, how can you gaurantee that first the
> zookeeper server come online, THEN the brokers, and THEN the webapp's with
> the producers?   Or is the timing not that strict because of e.g. timeout
> re-connect durations?
>
> Thanks.
>


Re: Blacklisting Brokers

2014-06-26 Thread Neha Narkhede
If ssh fails on the broker machine, I'm not why the broker would be in a
state where it maintains it's registration in zookeeper? If so, it will
automatically be marked dead and will not get elected as the leader. Have
you seen a case where such a broker became the leader? Could you elaborate
more on the type of failure that you describe?




On Tue, Jun 24, 2014 at 6:20 PM, Lung, Paul  wrote:

> Hi All,
>
> Is there anyway to blacklist brokers? Sometimes we run into situations
> where there are certain hardware failures on a broker machine, and the
> machines goes into a “half dead” state. The broker process is up and
> participating in the cluster, but it can’t actually transmit messages
> properly. Sometimes, even SSH fails on the machines, and we can’t log in to
> shutdown the broker process. The only way to resolve this issue at this
> point is to shutdown the machine, which is not always easy in a large data
> center scenario. The consequences are disastrous if this half dead broker
> is elected as the leader. So in this situation, it would be nice to be able
> to quickly black list a broker.
>
> Thank you.
> Paul Lung
>


Re: Kafka connection loss with high volume of messages

2014-06-26 Thread Neha Narkhede
11:57:26,897 INFO  [org.I0Itec.zkclient.ZkClient]
(pool-5-thread-1-EventThread) zookeeper state changed (Disconnected)
11:57:26,897 INFO  [org.I0Itec.zkclient.ZkClient]
(clojure-agent-send-off-pool-
6-EventThread) zookeeper state changed
(Disconnected)

I wonder why your consumer disconnects from zookeeper. Do you also see a
session expiration? Have you checked your GC logs to see if your consumer
process pauses for long duration?


On Thu, Jun 26, 2014 at 9:00 AM, Ahmed H.  wrote:

> I dug some more and it seems like before these errors show up, I see a few
> Zookeeper warnings, followed by Kafka errors.
>
> 11:57:26,897 INFO  [org.I0Itec.zkclient.ZkClient]
> (pool-5-thread-1-EventThread) zookeeper state changed (Disconnected)
> 11:57:26,897 INFO  [org.I0Itec.zkclient.ZkClient]
> (clojure-agent-send-off-pool-6-EventThread) zookeeper state changed
> (Disconnected)
> 11:57:26,898 INFO  [kafka.consumer.SimpleConsumer]
>
> (ConsumerFetcherThread-test-queue.default_localhost-1403795897237-bf49b4a5-0-0)
> Reconnect due to socket error: :
> java.nio.channels.ClosedByInterruptException
>  at
>
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
> [rt.jar:1.7.0_25]
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:402)
> [rt.jar:1.7.0_25]
>  at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:220)
> [rt.jar:1.7.0_25]
> at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
> [rt.jar:1.7.0_25]
>  at
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
> [rt.jar:1.7.0_25]
> at kafka.utils.Utils$.read(Utils.scala:394)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at
>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:71)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:108)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:108)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:108)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:107)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:107)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:107)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:106)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at
>
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>
>
> Again, this starts happening towards the end of my large message sending
> process.
>
> Thanks
>
>
> On Wed, Jun 25, 2014 at 6:02 PM, Neha Narkhede 
> wrote:
>
> > If rebalance succeeded, then those error messages are harmless. Though I
> > agree we shouldn't log those in the first place.
> >
> >
> > On Wed, Jun 25, 2014 at 2:12 PM, Ahmed H. 
> wrote:
> >
> > > Unfortunately I do not have the logs on hand anymore, they were cleared
> > > already.
> > >
> > > With that said, I do recall seeing some rebalancing. It attempts to
> > > rebalance a few times and eventually succeeds. In the past, I have had
> > > cases where it tries rebalancing 4 times and gives up because it
> reached
> > > it's limit. In this particular situation, it didn't totally fail.
> > >
> > >
> > > On Wed, Jun 25, 2014 at 2:44 PM, Neha Narkhede <
> neha.nark

Re: Blacklisting Brokers

2014-06-26 Thread Lung, Paul
Yes I have seen it elected as the leader. The broker process is still up,
but due to some unknown hardware failure, it¹s not working 100%. It is,
however, able to participate in the cluster, and got itself elected as the
leader. I¹m not sure what the hardware problem is though, since I couldn¹t
log into the machine to check.

Paul

On 6/26/14, 9:03 AM, "Neha Narkhede"  wrote:

>If ssh fails on the broker machine, I'm not why the broker would be in a
>state where it maintains it's registration in zookeeper? If so, it will
>automatically be marked dead and will not get elected as the leader. Have
>you seen a case where such a broker became the leader? Could you elaborate
>more on the type of failure that you describe?
>
>
>
>
>On Tue, Jun 24, 2014 at 6:20 PM, Lung, Paul  wrote:
>
>> Hi All,
>>
>> Is there anyway to blacklist brokers? Sometimes we run into situations
>> where there are certain hardware failures on a broker machine, and the
>> machines goes into a ³half dead² state. The broker process is up and
>> participating in the cluster, but it can¹t actually transmit messages
>> properly. Sometimes, even SSH fails on the machines, and we can¹t log
>>in to
>> shutdown the broker process. The only way to resolve this issue at this
>> point is to shutdown the machine, which is not always easy in a large
>>data
>> center scenario. The consequences are disastrous if this half dead
>>broker
>> is elected as the leader. So in this situation, it would be nice to be
>>able
>> to quickly black list a broker.
>>
>> Thank you.
>> Paul Lung
>>



Re: Unable to delete topic - kafka 0.8.0

2014-06-26 Thread Guozhang Wang
Hello Virendra,

Did you have any producer/consumer clients running during the whole process?

Guozhang


On Wed, Jun 25, 2014 at 11:53 PM, Virendra Pratap Singh <
vpsi...@yahoo-inc.com.invalid> wrote:

> I am aware of lack of programmatic way of deleting topics in kafka 0.8.0.
> So using the sledge hammer approach.
> This is what I am doing:
>
> 1. Bring whole of my kafka cluster down.
> 2. Delete all the content on all the kafka clusters pointed via logs.dir
> setting.
> 3. Delete the topic metadata from zookeeper : rmr /brokers (note I am not
> wiping off the whole zookeeper but the znode /brokers where the kafka
> broker ids and topic metadata is stored)
> 4. Restart the kafka cluster again.
>
> One would expect that the kafka cluster will come up with no memory of any
> topic from previous.
>
> But guess what, and this is the place where I need help and need to
> understand, when the kafka cluster comes back, it somehow is able to obtain
> the info of the previous topics. It promptly goes ahead creating and
> assigning partitions/replicas to the brokers for the previous topics. Now I
> am completely at loss to understand where exactly is kafka able to get the
> info of previous topics when I have wiped it off the zookeeper and also
> dropped the logs.dir locations across the kafka cluster.
>
> An insight is much needed here. Where else is the topic meta data store
> which the kafka server is getting hold of after coming back alive?
>
> Regards,
> Virendra
>
>


-- 
-- Guozhang


Re: Unable to delete topic - kafka 0.8.0

2014-06-26 Thread Neha Narkhede
Firstly, I'm not sure this process of deleting topics completely works,
especially in corner cases. Having said that, for #3, you should just be
deleting /brokers/topics/. If producers are sending
data to the Kafka cluster, it will recreate the topics once the brokers
come up.

Thanks,
Neha


On Wed, Jun 25, 2014 at 11:53 PM, Virendra Pratap Singh <
vpsi...@yahoo-inc.com.invalid> wrote:

> I am aware of lack of programmatic way of deleting topics in kafka 0.8.0.
> So using the sledge hammer approach.
> This is what I am doing:
>
> 1. Bring whole of my kafka cluster down.
> 2. Delete all the content on all the kafka clusters pointed via logs.dir
> setting.
> 3. Delete the topic metadata from zookeeper : rmr /brokers (note I am not
> wiping off the whole zookeeper but the znode /brokers where the kafka
> broker ids and topic metadata is stored)
> 4. Restart the kafka cluster again.
>
> One would expect that the kafka cluster will come up with no memory of any
> topic from previous.
>
> But guess what, and this is the place where I need help and need to
> understand, when the kafka cluster comes back, it somehow is able to obtain
> the info of the previous topics. It promptly goes ahead creating and
> assigning partitions/replicas to the brokers for the previous topics. Now I
> am completely at loss to understand where exactly is kafka able to get the
> info of previous topics when I have wiped it off the zookeeper and also
> dropped the logs.dir locations across the kafka cluster.
>
> An insight is much needed here. Where else is the topic meta data store
> which the kafka server is getting hold of after coming back alive?
>
> Regards,
> Virendra
>
>


Re: Blacklisting Brokers

2014-06-26 Thread Neha Narkhede
Could you file a JIRA for this? We may have to think through this
carefully. Black listing should be done in a way that maintains the
replication factor of the partition. Un blacklisting would then either
increase the replication factor or require carefully removing the replicas
on that broker.

Thanks,
Neha


On Thu, Jun 26, 2014 at 9:27 AM, Lung, Paul  wrote:

> Yes I have seen it elected as the leader. The broker process is still up,
> but due to some unknown hardware failure, it¹s not working 100%. It is,
> however, able to participate in the cluster, and got itself elected as the
> leader. I¹m not sure what the hardware problem is though, since I couldn¹t
> log into the machine to check.
>
> Paul
>
> On 6/26/14, 9:03 AM, "Neha Narkhede"  wrote:
>
> >If ssh fails on the broker machine, I'm not why the broker would be in a
> >state where it maintains it's registration in zookeeper? If so, it will
> >automatically be marked dead and will not get elected as the leader. Have
> >you seen a case where such a broker became the leader? Could you elaborate
> >more on the type of failure that you describe?
> >
> >
> >
> >
> >On Tue, Jun 24, 2014 at 6:20 PM, Lung, Paul  wrote:
> >
> >> Hi All,
> >>
> >> Is there anyway to blacklist brokers? Sometimes we run into situations
> >> where there are certain hardware failures on a broker machine, and the
> >> machines goes into a ³half dead² state. The broker process is up and
> >> participating in the cluster, but it can¹t actually transmit messages
> >> properly. Sometimes, even SSH fails on the machines, and we can¹t log
> >>in to
> >> shutdown the broker process. The only way to resolve this issue at this
> >> point is to shutdown the machine, which is not always easy in a large
> >>data
> >> center scenario. The consequences are disastrous if this half dead
> >>broker
> >> is elected as the leader. So in this situation, it would be nice to be
> >>able
> >> to quickly black list a broker.
> >>
> >> Thank you.
> >> Paul Lung
> >>
>
>


Re: Intercept broker operation in Kafka

2014-06-26 Thread Neha Narkhede
Most of these are available via JMX and others can be available via
zookeeper. I'm not sure why/how you would monitor "messages being deleted
by the broker". In general, monitoring via JMX is preferable to scraping
logs.

Thanks,
Neha


On Mon, Jun 23, 2014 at 11:51 PM, ravi singh  wrote:

> Primarily we want to log below date(although this is not the exhaustive
> list):
>
> + any error/exception during kafka start/stop
> + any error/exception while broker is running
> + broker state changes like leader re-election, broker goes down,
> + Current live brokers
> + new topic creation
> + when messages are deleted by broker after specified limit
> + Broker health : memory usage
>
> Regards,
> Ravi
>
>
> On Tue, Jun 24, 2014 at 11:11 AM, Neha Narkhede 
> wrote:
>
> > What kind of broker metrics are you trying to push to this centralized
> > logging framework?
> >
> > Thanks,
> > Neha
> > On Jun 23, 2014 8:51 PM, "ravi singh"  wrote:
> >
> > > Thanks Guozhang/Neha for replies.
> > > Here's my use case:
> > >
> > > We use proprietary application logging  in our apps. We are planning to
> > use
> > > Kafka brokers in production , but apart from the logs which are already
> > > logged using log4j in kafka we want to log the broker stats using our
> > > centralized application logging framework.
> > >
> > > Simply put I want to write an application which could start when the
> > kafka
> > > brokers starts, read the broker state and metrics and push it to the
> > > centralized logging servers.
> > >
> > > In ActiveMQ we have a plugin for our proprietary logging. We intercept
> > > broker operation and install the plugin into the interceptor chain of
> the
> > > broker.
> > >
> > > Regards,
> > > Ravi
> > >
> > >
> > > On Mon, Jun 23, 2014 at 9:29 PM, Neha Narkhede <
> neha.narkh...@gmail.com>
> > > wrote:
> > >
> > > > Ravi,
> > > >
> > > > Our goal is to provide the best implementation of a set of useful
> > > > abstractions and features in Kafka. The motivation behind this
> > philosophy
> > > > is performance and simplicity at the cost of flexibility. In most
> > cases,
> > > we
> > > > can argue that the loss in flexibility is minimal since you can
> always
> > > get
> > > > that functionality by modeling your application differently,
> especially
> > > if
> > > > the system supports high performance. ActiveMQ has to support the JMS
> > > > protocol and hence provide all sorts of hooks and plugins on the
> > brokers
> > > at
> > > > the cost of performance.
> > > >
> > > > Could you elaborate more on your use case? There is probably another
> > way
> > > to
> > > > model your application using Kafka.
> > > >
> > > > Thanks,
> > > > Neha
> > > >
> > > >
> > > > On Sat, Jun 21, 2014 at 9:24 AM, ravi singh 
> > wrote:
> > > >
> > > > > How do I intercept Kakfa broker operation so that features such as
> > > > > security,logging,etc can be implemented as a pluggable filter. For
> > > > example
> > > > > we have "BrokerFilter" class in ActiveMQ , Is there anything
> similar
> > in
> > > > > Kafka?
> > > > >
> > > > > --
> > > > > *Regards,*
> > > > > *Ravi*
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > *Regards,*
> > > *Ravi*
> > >
> >
>
>
>
> --
> *Regards,*
> *Ravi*
>


Re: Unable to delete topic - kafka 0.8.0

2014-06-26 Thread Virendra Pratap Singh
No producers were active. The only producer in our pipeline is a storm
topology and had made it a point that the whole cluster was down and no
topology running.
What I did was nothing short of doing a fresh kafka cluster setup (barring
the fact that I didn¹t wiped the zookeepers as its the same setup which
our storm uses. But given the fact that I had dropped /brokers, it should
not be concern).

So the question is where exactly is kafka brokers getting the info about
the topic to recreate when they come up when there is no producers and all
previous info has been wiped out.

Regards,
Virendra

On 6/26/14, 9:29 AM, "Neha Narkhede"  wrote:

>Firstly, I'm not sure this process of deleting topics completely works,
>especially in corner cases. Having said that, for #3, you should just be
>deleting /brokers/topics/. If producers are sending
>data to the Kafka cluster, it will recreate the topics once the brokers
>come up.
>
>Thanks,
>Neha
>
>
>On Wed, Jun 25, 2014 at 11:53 PM, Virendra Pratap Singh <
>vpsi...@yahoo-inc.com.invalid> wrote:
>
>> I am aware of lack of programmatic way of deleting topics in kafka
>>0.8.0.
>> So using the sledge hammer approach.
>> This is what I am doing:
>>
>> 1. Bring whole of my kafka cluster down.
>> 2. Delete all the content on all the kafka clusters pointed via logs.dir
>> setting.
>> 3. Delete the topic metadata from zookeeper : rmr /brokers (note I am
>>not
>> wiping off the whole zookeeper but the znode /brokers where the kafka
>> broker ids and topic metadata is stored)
>> 4. Restart the kafka cluster again.
>>
>> One would expect that the kafka cluster will come up with no memory of
>>any
>> topic from previous.
>>
>> But guess what, and this is the place where I need help and need to
>> understand, when the kafka cluster comes back, it somehow is able to
>>obtain
>> the info of the previous topics. It promptly goes ahead creating and
>> assigning partitions/replicas to the brokers for the previous topics.
>>Now I
>> am completely at loss to understand where exactly is kafka able to get
>>the
>> info of previous topics when I have wiped it off the zookeeper and also
>> dropped the logs.dir locations across the kafka cluster.
>>
>> An insight is much needed here. Where else is the topic meta data store
>> which the kafka server is getting hold of after coming back alive?
>>
>> Regards,
>> Virendra
>>
>>



Re: Unable to delete topic - kafka 0.8.0

2014-06-26 Thread Virendra Pratap Singh
I am 100% sure nothing was running. However I am not sure of consumers.
Would that make any difference?
I thought producer message write request could only cause kafka to
initiate topic creation, not consumer read. Would consumer read request
even succeed if the topic metadata is non-existent in zookeeper.
Since I brought the kafka brokers down, and deleted all logs and zookeeper
metadata, I would assume consumers will sync with that info and fail, as
now the topic is nonexistent. Not sure how consumer can impact in this
situation.

Compared to clean install, the only thing I see is zookeepers not wiped
off and reinstalled/restarted. But I believe that¹s should not be an issue
as I dropped the /brokers znode.

Virendra

On 6/26/14, 9:28 AM, "Guozhang Wang"  wrote:

>Hello Virendra,
>
>Did you have any producer/consumer clients running during the whole
>process?
>
>Guozhang
>
>
>On Wed, Jun 25, 2014 at 11:53 PM, Virendra Pratap Singh <
>vpsi...@yahoo-inc.com.invalid> wrote:
>
>> I am aware of lack of programmatic way of deleting topics in kafka
>>0.8.0.
>> So using the sledge hammer approach.
>> This is what I am doing:
>>
>> 1. Bring whole of my kafka cluster down.
>> 2. Delete all the content on all the kafka clusters pointed via logs.dir
>> setting.
>> 3. Delete the topic metadata from zookeeper : rmr /brokers (note I am
>>not
>> wiping off the whole zookeeper but the znode /brokers where the kafka
>> broker ids and topic metadata is stored)
>> 4. Restart the kafka cluster again.
>>
>> One would expect that the kafka cluster will come up with no memory of
>>any
>> topic from previous.
>>
>> But guess what, and this is the place where I need help and need to
>> understand, when the kafka cluster comes back, it somehow is able to
>>obtain
>> the info of the previous topics. It promptly goes ahead creating and
>> assigning partitions/replicas to the brokers for the previous topics.
>>Now I
>> am completely at loss to understand where exactly is kafka able to get
>>the
>> info of previous topics when I have wiped it off the zookeeper and also
>> dropped the logs.dir locations across the kafka cluster.
>>
>> An insight is much needed here. Where else is the topic meta data store
>> which the kafka server is getting hold of after coming back alive?
>>
>> Regards,
>> Virendra
>>
>>
>
>
>-- 
>-- Guozhang



Re: Monitoring Producers at Large Scale

2014-06-26 Thread Bhavesh Mistry
Hi All,

Thanks for all your responses.



JMX metrics are there and we do pull the metrics, but I would like to
capture the logs from Kafka lib as well especially WARN, FATAL and ERROR
etc to debug the issue.



To do this, we intercept Log4j logging and send it to Kafka Log Topics, but
I realize that under heavy Kafka Lib error/warn/  it will create a deadlock
between Producer Send thread  (Logging Kafka log topic queue...)



*public* *class* KafkaLog4jAppender *extends* AppenderSkeleton {



Producer  producer..

*protected* *void* append(LoggingEvent event) {


if(event.getLoggerName().startsWith("kafka")){

 if(event is WARN, FATAL and ERROR){

producer.send(event.getRenderedMessage())

}

}



}


Other option is to log Kafka Logs into disk and transport logs via
separate process
to Kafka Topic and transport via https://github.com/harelba/tail2kafka to
topic...


We use Kafka for Log transportation and we want to debug/trouble shoot
issue via logs or create alerts/etc


Thanks,


Bhavesh




On Wed, Jun 25, 2014 at 10:49 AM, Neha Narkhede 
wrote:

> We monitor producers or for that matter any process/service using JMX
> metrics. Every server and service in LinkedIn sends metrics in a Kafka
> message to a metrics Kafka cluster. We have subscribers that connect to the
> metrics cluster to index that data in RRDs.
>
> Our aim is to expose all important metrics through JMX. We are doing that
> for the new producer under org.apache.kafka.clients.producer. Feel free to
> take a look at that and give feedback.
>
> Thanks,
> Neha
>
>
> On Tue, Jun 24, 2014 at 7:59 PM, Darion Yaphet 
> wrote:
>
> > Sorry I want to  know  you want to monitor kafka producers or kafka
> brokers
> > and zookeepers ?
> > It's seems you will want to monitor monitor Exceptions eg Leader Not
> Found,
> > Queue is full, resend fail  etc  are kafka cluster
> >
> >
> > 2014-06-25 8:20 GMT+08:00 Bhavesh Mistry :
> >
> > > We use Kafka as Transport Layer to transport application logs.  How do
> we
> > > monitor Producers at large scales about 6000 boxes x 4 topic per box so
> > > roughly 24000 producers (spread across multiple data center.. we have
> > > brokers per DC).  We do the monitoring based on logs.  I have tried
> > > intercepting logs via Log4J custom implementation which only intercept
> > WARN
> > > and ERROR and FATAL events  org.apache.log4j.AppenderSkeleton append
> > method
> > > which send its logs to brokers (This is working but after load testing
> it
> > > is causing deadlock some times between ProducerSendThread and
> Producer).
> > >
> > > I know there are JMX monitoring MBeans available which we can pull the
> > > data, but I would like to monitor Exceptions eg Leader Not Found, Queue
> > is
> > > full, resend fail etc in Kafka Library.
> > >
> > > How does LinkedIn monitor the Producers ?
> > >
> > > Thanks,
> > >
> > > Bhavesh
> > >
> >
> >
> >
> > --
> >
> >
> > long is the way and hard  that out of Hell leads up to light
> >
>


Question on message content, compression, multiple messages per kafka message?

2014-06-26 Thread Bert Corderman
We are in the process of engineering a system that will be using kafka.
The legacy system is using the local file system and  a database as the
queue.  In terms of scale we process about 35 billion events per day
contained in 15 million files.



I am looking for feedback on a design decision we are discussing



In our current system we depending heavily on compression as a performance
optimization.  In kafka the use of compression has some overhead as the
broker needs to decompress the data to assign offsets and re-compress.
(explained in detail here
http://geekmantra.wordpress.com/2013/03/28/compression-in-kafka-gzip-or-snappy/
)



We are thinking about NOT using Kafka compression but rather compressing
multiple rows in our code. For example let’s say we wanted to send data in
batches of 5,00 rows.  Using Kafka compression we would use a batch size of
 5,000 rows and use compression. The other option is using a batch size of
1 in Kafka BUT in our code take 5,000 messages, compress them and then send
to kafka using the kafka compression setting of none.



Is this  a pattern others have used?



Regardless of compression I am curious if others are using a single message
in kafka to contain multiple messages from an application standpoint.


Bert


Scalability question?

2014-06-26 Thread Zack Payton
Hi there,

There have been some internal debates here about how far we can scale
Kafka.  Ideally, we'd be able to make it scale to 90 billion events a day.
 I've seen somewhere that linked scaled it up to 40 billion events a day.
 Has anyone seen a hard plateau in terms of scalability?  Does anyone have
any advice for tweaking configs to achieve ultra-high performance?

Thanks,
Z


Re: Scalability question?

2014-06-26 Thread Jay Kreps
I think currently we do a little over 200 billion events per day at
LinkedIn, though we are not actually the largest Kafka user any more.

On the whole scaling the volume of messages is actually not that hard in
Kafka. Data is partitioned, and partitions don't really communicate with
each other, so adding more machines will add more capacity, there really
aren't a ton of gotchas.

The operations section of the wiki has some tips on performance tuning. I
recommend using the performance test commands described in the link from
this test to try out some stuff on your gear and get a feeling for how much
hardware you need:
http://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines

-Jay


On Thu, Jun 26, 2014 at 1:29 PM, Zack Payton  wrote:

> Hi there,
>
> There have been some internal debates here about how far we can scale
> Kafka.  Ideally, we'd be able to make it scale to 90 billion events a day.
>  I've seen somewhere that linked scaled it up to 40 billion events a day.
>  Has anyone seen a hard plateau in terms of scalability?  Does anyone have
> any advice for tweaking configs to achieve ultra-high performance?
>
> Thanks,
> Z
>


Question about kafka-consumer-perf-test.sh

2014-06-26 Thread Baran Nohutçuoğlu
Hi, I’m having trouble understanding the results from running 
kafka-consumer-perf-test.  For low number of messages, I see very low 
throughput in terms of messages / second.  Here is a table of results:

fetch.size
data.consumed.in.MB
MB.sec
data.consumed.in.nMsg
nMsg.sec
1048576
0.0003
0
1
0.014
1048576
0.0029
0
10
0.0956
1048576
0.0286
0.0003
100
1.0404
1048576
0.2861
0.0035
1000
12.1222
1048576
2.861
0.033
1
115.3602
1048576
28.6102
0.4234
10
1479.7277
1048576
286.1023
4.0228
100
14060.5447
1048576
1142.2345
16.6818
3992399
58307.0306

I would have expected a constant time to establish connections and then 
nMsg.sec to increase.  When consuming just 1000 messages, why is it so slow?

Thanks,
-  Baran

(information for reproducing results)
I have a single kafka broker and single zookeeper node.  Here are the commands 
I ran:

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test 
--replication 1 --partition 1
bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance 
localhost:9092 600 300

for i in 1 10 100 1000 1 10 100 1000; do 
bin/kafka-consumer-perf-test.sh --zookeeper localhost:2181 --messages 
$i --topic test --threads 1; 
done

Re: Question on message content, compression, multiple messages per kafka message?

2014-06-26 Thread Neha Narkhede
Using a single Kafka message to contain an application snapshot has the
upside of getting atomicity for free. Either the snapshot will be written
as a whole to Kafka or not. This is poor man's transactionality. Care needs
to be taken to ensure that the message is not too large since that might
cause memory consumption problems on the server or the consumers.

As far as compression overhead is concerned, have you tried running Snappy?
Snappy's performance is good enough to offset the decompression-compression
overhead on the server.

Thanks,
Neha


On Thu, Jun 26, 2014 at 12:42 PM, Bert Corderman  wrote:

> We are in the process of engineering a system that will be using kafka.
> The legacy system is using the local file system and  a database as the
> queue.  In terms of scale we process about 35 billion events per day
> contained in 15 million files.
>
>
>
> I am looking for feedback on a design decision we are discussing
>
>
>
> In our current system we depending heavily on compression as a performance
> optimization.  In kafka the use of compression has some overhead as the
> broker needs to decompress the data to assign offsets and re-compress.
> (explained in detail here
>
> http://geekmantra.wordpress.com/2013/03/28/compression-in-kafka-gzip-or-snappy/
> )
>
>
>
> We are thinking about NOT using Kafka compression but rather compressing
> multiple rows in our code. For example let’s say we wanted to send data in
> batches of 5,00 rows.  Using Kafka compression we would use a batch size of
>  5,000 rows and use compression. The other option is using a batch size of
> 1 in Kafka BUT in our code take 5,000 messages, compress them and then send
> to kafka using the kafka compression setting of none.
>
>
>
> Is this  a pattern others have used?
>
>
>
> Regardless of compression I am curious if others are using a single message
> in kafka to contain multiple messages from an application standpoint.
>
>
> Bert
>


RE: Experiences with larger message sizes

2014-06-26 Thread Luke Forehand
I have used 50MB message size and it is not a great idea.  First of all you 
need to make sure you have these settings in sync:
message.max.bytes
replica.fetch.max.bytes
fetch.message.max.bytes

I had not set the replica fetch setting and didn't realize one of my partitions 
was not replicating after a large message was produced.  I also ran into heap 
issues with having to fetch such a large message, lots of unnecessary garbage 
collection.  I suggest breaking down your message into smaller chunks.  In my 
case, I decided to break an XML input stream (which had a root element wrapping 
a ridiculously large number of children) into smaller messages, having to parse 
the large xml root document and re-wrap each child element with a shallow clone 
of its parents as I iterated the stream.  

-Luke


From: Denny Lee 
Sent: Tuesday, June 24, 2014 10:35 AM
To: users@kafka.apache.org
Subject: Experiences with larger message sizes

By any chance has anyone worked with using Kafka with message sizes that are 
approximately 50MB in size?  Based on from some of the previous threads there 
are probably some concerns on memory pressure due to the compression on the 
broker and decompression on the consumer and a best practices on ensuring batch 
size (to ultimately not have the compressed message exceed message size limit).

Any other best practices or thoughts concerning this scenario?

Thanks!
Denny


Re: Experiences with larger message sizes

2014-06-26 Thread Bert Corderman
Thanks for the details Luke.

At what point would you consider a message too big?

Are you using compression?

Bert

On Thursday, June 26, 2014, Luke Forehand <
luke.foreh...@networkedinsights.com> wrote:

> I have used 50MB message size and it is not a great idea.  First of all
> you need to make sure you have these settings in sync:
> message.max.bytes
> replica.fetch.max.bytes
> fetch.message.max.bytes
>
> I had not set the replica fetch setting and didn't realize one of my
> partitions was not replicating after a large message was produced.  I also
> ran into heap issues with having to fetch such a large message, lots of
> unnecessary garbage collection.  I suggest breaking down your message into
> smaller chunks.  In my case, I decided to break an XML input stream (which
> had a root element wrapping a ridiculously large number of children) into
> smaller messages, having to parse the large xml root document and re-wrap
> each child element with a shallow clone of its parents as I iterated the
> stream.
>
> -Luke
>
> 
> From: Denny Lee >
> Sent: Tuesday, June 24, 2014 10:35 AM
> To: users@kafka.apache.org 
> Subject: Experiences with larger message sizes
>
> By any chance has anyone worked with using Kafka with message sizes that
> are approximately 50MB in size?  Based on from some of the previous threads
> there are probably some concerns on memory pressure due to the compression
> on the broker and decompression on the consumer and a best practices on
> ensuring batch size (to ultimately not have the compressed message exceed
> message size limit).
>
> Any other best practices or thoughts concerning this scenario?
>
> Thanks!
> Denny
>


Re: Question on message content, compression, multiple messages per kafka message?

2014-06-26 Thread Bert Corderman
What would you consider being a message that is “too large”



In April I ran a bunch of tests which I outlined in the following thread



http://grokbase.com/t/kafka/users/145g8k62rf/performance-testing-data-to-share



It includes a google doc link with all the results (its easiest to download
in excel and uses filters to drill into what you want).  When looking at
snappy vs NONE I didn’t see much improvement for 2200 byte messages  we are
looking at and for small messages NONE was the fastest.



Running Kafka 8.0 on a three node cluster.  16 core, 256GB RAM, 12 4TB
drives

Message.size = 2200

Batch.size  = 400

Partitions = 12

Replication=3

acks=leader



I was able to get…

SNAPPY = 151K messages per second

NONE = 140K messages per second

GZIP = 86K messages per second



With small messages of 200 bytes

SNAPPY = 660K messages per second

NONE = 740K messages per second

GZIP = 340K messages per second





So let’s assume I can compress 2200 bytes into 200 bytes.  (Just using
these numbers as I ran tests on these sizes, my guess is I will not get
this good compression, but its an example)   If I run uncompressed I could
process 140K messages per second.  If I compressed in my application from
2200 to 200 bytes I could then send through Kafka at 740K events per second




Bert








On Thu, Jun 26, 2014 at 5:23 PM, Neha Narkhede 
wrote:

> Using a single Kafka message to contain an application snapshot has the
> upside of getting atomicity for free. Either the snapshot will be written
> as a whole to Kafka or not. This is poor man's transactionality. Care needs
> to be taken to ensure that the message is not too large since that might
> cause memory consumption problems on the server or the consumers.
>
> As far as compression overhead is concerned, have you tried running Snappy?
> Snappy's performance is good enough to offset the decompression-compression
> overhead on the server.
>
> Thanks,
> Neha
>
>
> On Thu, Jun 26, 2014 at 12:42 PM, Bert Corderman 
> wrote:
>
> > We are in the process of engineering a system that will be using kafka.
> > The legacy system is using the local file system and  a database as the
> > queue.  In terms of scale we process about 35 billion events per day
> > contained in 15 million files.
> >
> >
> >
> > I am looking for feedback on a design decision we are discussing
> >
> >
> >
> > In our current system we depending heavily on compression as a
> performance
> > optimization.  In kafka the use of compression has some overhead as the
> > broker needs to decompress the data to assign offsets and re-compress.
> > (explained in detail here
> >
> >
> http://geekmantra.wordpress.com/2013/03/28/compression-in-kafka-gzip-or-snappy/
> > )
> >
> >
> >
> > We are thinking about NOT using Kafka compression but rather compressing
> > multiple rows in our code. For example let’s say we wanted to send data
> in
> > batches of 5,00 rows.  Using Kafka compression we would use a batch size
> of
> >  5,000 rows and use compression. The other option is using a batch size
> of
> > 1 in Kafka BUT in our code take 5,000 messages, compress them and then
> send
> > to kafka using the kafka compression setting of none.
> >
> >
> >
> > Is this  a pattern others have used?
> >
> >
> >
> > Regardless of compression I am curious if others are using a single
> message
> > in kafka to contain multiple messages from an application standpoint.
> >
> >
> > Bert
> >
>


Apache Kafka NYC Users Group!

2014-06-26 Thread Joe Stein
Hi folks, I just started a new Meetup specifically for Apache Kafka in NYC
(everyone is welcome of course) http://www.meetup.com/Apache-Kafka-NYC/

For the last couple of years we have been piggy backing talks and the
community with other NYC meetup groups (Storm, Cassandra, Hadoop, etc) and
figured it was about time to-do this.

I am starting to look for more folks to talk about Kafka (running kafka,
eco system, clients, et el) so we can get some meetups scheduled and keep
things moving along (also venues).

Thanks!

/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop 
/


message stuck, possible problem setting fetch.message.max.bytes

2014-06-26 Thread Louis Clark
Hi, I'm trying to stream large message with Kafka into Spark.  Generally
this has been working nicely, but I found one message (5.1MB in size) which
is clogging my pipeline up.  I have these settings in server.properties:
fetch.message.max.bytes=10485760
replica.fetch.max.bytes=10485760
message.max.bytes=10485760
fetch.size=10485760

I'm not getting any obvious errors in the logs and I can retrieve the large
message with this command:
kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning
--topic mytopic --fetch-size=10485760

I noticed recently after digging into this problem that the kafkaServer.out
log is complaining that the fetch.message.max.bytes parameter is not valid:
[2014-06-25 11:33:36,547] WARN Property fetch.message.max.bytes is not
valid (kafka.utils.VerifiableProperties)
[2014-06-25 11:33:36,547] WARN Property fetch.size is not valid
(kafka.utils.VerifiableProperties)
That seems like the most critical parameter for my needs.  It is apparently
not recognizing that it is a parameter despite it being listed on the
configuration website (https://kafka.apache.org/08/configuration.html).
 I'm using 0.8.1.1.  Any ideas?

many thanks for reading this!


Re: message stuck, possible problem setting fetch.message.max.bytes

2014-06-26 Thread Guozhang Wang
Hi Louis,

What are your consumer's config properties?

Guozhang


On Thu, Jun 26, 2014 at 5:54 PM, Louis Clark  wrote:

> Hi, I'm trying to stream large message with Kafka into Spark.  Generally
> this has been working nicely, but I found one message (5.1MB in size) which
> is clogging my pipeline up.  I have these settings in server.properties:
> fetch.message.max.bytes=10485760
> replica.fetch.max.bytes=10485760
> message.max.bytes=10485760
> fetch.size=10485760
>
> I'm not getting any obvious errors in the logs and I can retrieve the large
> message with this command:
> kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning
> --topic mytopic --fetch-size=10485760
>
> I noticed recently after digging into this problem that the kafkaServer.out
> log is complaining that the fetch.message.max.bytes parameter is not valid:
> [2014-06-25 11:33:36,547] WARN Property fetch.message.max.bytes is not
> valid (kafka.utils.VerifiableProperties)
> [2014-06-25 11:33:36,547] WARN Property fetch.size is not valid
> (kafka.utils.VerifiableProperties)
> That seems like the most critical parameter for my needs.  It is apparently
> not recognizing that it is a parameter despite it being listed on the
> configuration website (https://kafka.apache.org/08/configuration.html).
>  I'm using 0.8.1.1.  Any ideas?
>
> many thanks for reading this!
>



-- 
-- Guozhang


Re: message stuck, possible problem setting fetch.message.max.bytes

2014-06-26 Thread Louis Clark
in the consumer.properties file, I've got (default?):

zookeeper.connect=127.0.0.1:2181

zookeeper.connection.timeout.ms=100

group.id=test-consumer-group

thanks,

-Louis


On Thu, Jun 26, 2014 at 6:04 PM, Guozhang Wang  wrote:

> Hi Louis,
>
> What are your consumer's config properties?
>
> Guozhang
>
>
> On Thu, Jun 26, 2014 at 5:54 PM, Louis Clark  wrote:
>
>> Hi, I'm trying to stream large message with Kafka into Spark.  Generally
>> this has been working nicely, but I found one message (5.1MB in size)
>> which
>> is clogging my pipeline up.  I have these settings in server.properties:
>> fetch.message.max.bytes=10485760
>> replica.fetch.max.bytes=10485760
>> message.max.bytes=10485760
>> fetch.size=10485760
>>
>> I'm not getting any obvious errors in the logs and I can retrieve the
>> large
>> message with this command:
>> kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning
>> --topic mytopic --fetch-size=10485760
>>
>> I noticed recently after digging into this problem that the
>> kafkaServer.out
>> log is complaining that the fetch.message.max.bytes parameter is not
>> valid:
>> [2014-06-25 11:33:36,547] WARN Property fetch.message.max.bytes is not
>> valid (kafka.utils.VerifiableProperties)
>> [2014-06-25 11:33:36,547] WARN Property fetch.size is not valid
>> (kafka.utils.VerifiableProperties)
>> That seems like the most critical parameter for my needs.  It is
>> apparently
>> not recognizing that it is a parameter despite it being listed on the
>> configuration website (https://kafka.apache.org/08/configuration.html).
>>  I'm using 0.8.1.1.  Any ideas?
>>
>> many thanks for reading this!
>>
>
>
>
> --
> -- Guozhang
>


Re: Experiences with larger message sizes

2014-06-26 Thread Denny Lee
Yes,  thanks very much Luke - this is very helpful for my plans.  I was under 
the same impression but it’s always good to have verification, eh?! 


On June 26, 2014 at 4:48:03 PM, Bert Corderman (bertc...@gmail.com) wrote:

Thanks for the details Luke.  

At what point would you consider a message too big?  

Are you using compression?  

Bert  

On Thursday, June 26, 2014, Luke Forehand <  
luke.foreh...@networkedinsights.com> wrote:  

> I have used 50MB message size and it is not a great idea. First of all  
> you need to make sure you have these settings in sync:  
> message.max.bytes  
> replica.fetch.max.bytes  
> fetch.message.max.bytes  
>  
> I had not set the replica fetch setting and didn't realize one of my  
> partitions was not replicating after a large message was produced. I also  
> ran into heap issues with having to fetch such a large message, lots of  
> unnecessary garbage collection. I suggest breaking down your message into  
> smaller chunks. In my case, I decided to break an XML input stream (which  
> had a root element wrapping a ridiculously large number of children) into  
> smaller messages, having to parse the large xml root document and re-wrap  
> each child element with a shallow clone of its parents as I iterated the  
> stream.  
>  
> -Luke  
>  
>   
> From: Denny Lee >  
> Sent: Tuesday, June 24, 2014 10:35 AM  
> To: users@kafka.apache.org   
> Subject: Experiences with larger message sizes  
>  
> By any chance has anyone worked with using Kafka with message sizes that  
> are approximately 50MB in size? Based on from some of the previous threads  
> there are probably some concerns on memory pressure due to the compression  
> on the broker and decompression on the consumer and a best practices on  
> ensuring batch size (to ultimately not have the compressed message exceed  
> message size limit).  
>  
> Any other best practices or thoughts concerning this scenario?  
>  
> Thanks!  
> Denny  
>