Two things:
1. The OOM happened on the consumer, right? So the memory that matters
is the RAM on the consumer machine, not on the Kafka cluster nodes.

2. If the consumers belong to the same consumer group, each will
consume a subset of the partitions and will only need to allocate
memory for those partitions.

So, assuming all your consumers belong to the same group:
2 consumers  -> each has 500 partitions -> each uses 500MB.

The total remains 1GB no matter how many consumers you have, as long
as they are all in the same group.

If the consumer belong to different groups (i.e. they read copies of
the same messages from the same partitions), then yes, you are limited
to 8 per server (probably less because there are other stuff on the
server).

Gwen

On Mon, Jan 19, 2015 at 3:06 PM, Pranay Agarwal
<agarwalpran...@gmail.com> wrote:
> Thanks a lot Natty.
>
> I am using this Ruby gem on the client side with all the default config
> https://github.com/joekiller/jruby-kafka/blob/master/lib/jruby-kafka/group.rb
> and the value fetch.message.max.bytes is set to 1MB.
>
> Currently I only have 3 nodes setup in the Kafka cluster (with 8 GB RAM)
> and if 1 consumer if going to take 1000 partitions X 1mb ~ 1GB, does it
> mean 1 kafka node can at best support 8 consumer only? Also, when I do
> top/free on the Kafka cluster nodes (Both zookeeper and kafka is deployed
> on each 3 nodes of the cluster) I don't see lots of memory being used on
> the machine. Also, even with this calculation, I shouldn't be facing any
> issue with only 1 consumer, as I have 8GB of JVM space given to Kafka
> nodes, right?
>
> Thanks
> -Pranay
>
> On Mon, Jan 19, 2015 at 2:53 PM, Jonathan Natkins <na...@streamsets.com>
> wrote:
>
>> The fetch.message.max.size is actually a client-side configuration. With
>> regard to increasing the number of threads, I think the calculation may be
>> a little more subtle than what you're proposing, and frankly, it's unlikely
>> that your servers can handle allocating 200MB x 1000 threads = 200GB of
>> memory at a single time.
>>
>> I believe that if you have every partition on a single broker, and all of
>> your consumer threads are requesting data simultaneously, then yes, the
>> broker would attempt to allocate 200GB of heap, and probably you'll hit an
>> OOME. However, since each consumer is only reading from one partition,
>> those 1000 threads should be making requests that are spread out over the
>> entire Kafka cluster. Depending on the memory on your servers, you may need
>> to increase the number of brokers in your cluster to support the 1000
>> threads. For example, I would expect that you can support this with 10
>> brokers if each broker has something north of 20GB of heap allocated.
>>
>> Some of this is a little bit of guess work on my part, and I'm not super
>> confident of my numbers...Can anybody else on the list validate my math?
>>
>> Thanks,
>> Natty
>>
>> Jonathan "Natty" Natkins
>> StreamSets | Customer Engagement Engineer
>> mobile: 609.577.1600 | linkedin <http://www.linkedin.com/in/nattyice>
>>
>>
>> On Mon, Jan 19, 2015 at 2:34 PM, Pranay Agarwal <agarwalpran...@gmail.com>
>> wrote:
>>
>> > Thanks Natty.
>> >
>> > Is there any config which I need to change on the client side as well?
>> > Also, currently I am trying with only 1 consumer thread. Does the
>> equation
>> > changes to
>> > (#partitions)*(fetchsize)*(#consumer_threads) in case I try to read with
>> > 1000 threads from from topic2(1000 partitions)?
>> >
>> > -Pranay
>> >
>> > On Mon, Jan 19, 2015 at 2:26 PM, Jonathan Natkins <na...@streamsets.com>
>> > wrote:
>> >
>> > > Hi Pranay,
>> > >
>> > > I think the JIRA you're referencing is a bit orthogonal to the OOME
>> that
>> > > you're experiencing. Based on the stacktrace, it looks like your OOME
>> is
>> > > coming from a consumer request, which is attempting to allocate 200MB.
>> > > There was a thread (relatively recently) that discussed what I think is
>> > > your issue:
>> > >
>> > >
>> > >
>> >
>> http://mail-archives.apache.org/mod_mbox/kafka-users/201412.mbox/%3CCAG1fNJDHHGSL-x3wp=pPZS1asOdOBrQ-Ge3kiA3Bk_iz7o=5...@mail.gmail.com%3E
>> > >
>> > > I suspect that the takeaway is that the way Kafka determines the
>> required
>> > > memory for a consumer request is (#partitions in the topic) x
>> > > (replica.fetch.max.bytes), and seemingly you don't have enough memory
>> > > allocated to handle that request. The solution is likely to increase
>> the
>> > > heap size on your brokers or to decrease your max fetch size.
>> > >
>> > > Thanks,
>> > > Natty
>> > >
>> > > Jonathan "Natty" Natkins
>> > > StreamSets | Customer Engagement Engineer
>> > > mobile: 609.577.1600 | linkedin <http://www.linkedin.com/in/nattyice>
>> > >
>> > >
>> > > On Mon, Jan 19, 2015 at 2:10 PM, Pranay Agarwal <
>> > agarwalpran...@gmail.com>
>> > > wrote:
>> > >
>> > > > Hi All,
>> > > >
>> > > > I have a kafka cluster setup which has 2 topics
>> > > >
>> > > > topic1 with 10 partitions
>> > > > topic2 with 1000 partitions.
>> > > >
>> > > > While, I am able to consume messages from topic1 just fine, I get
>> > > following
>> > > > error from the topic2. There is a resolved issue here on the same
>> thing
>> > > > https://issues.apache.org/jira/browse/KAFKA-664
>> > > >
>> > > > I am using latest kafka server version, and I have used kafka command
>> > > line
>> > > > tools to consumer from the topics.
>> > > >
>> > > > >>>>
>> > > > [2015-01-19 22:08:10,758] ERROR OOME with size 201332748
>> > > > (kafka.network.BoundedByteBufferReceive)
>> > > >
>> > > > java.lang.OutOfMemoryError: Java heap space
>> > > > at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>> > > > at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
>> > > > at
>> > > >
>> > > >
>> > >
>> >
>> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
>> > > > at
>> > > >
>> > > >
>> > >
>> >
>> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
>> > > > at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>> > > > at
>> > > >
>> > > >
>> > >
>> >
>> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>> > > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>> > > > at
>> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
>> > > > at
>> > > >
>> > > >
>> > >
>> >
>> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
>> > > > at
>> > > >
>> > > >
>> > >
>> >
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
>> > > > at
>> > > >
>> > > >
>> > >
>> >
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>> > > > at
>> > > >
>> > > >
>> > >
>> >
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>> > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>> > > > at
>> > > >
>> > > >
>> > >
>> >
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
>> > > > at
>> > > >
>> > > >
>> > >
>> >
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>> > > > at
>> > > >
>> > > >
>> > >
>> >
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>> > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>> > > > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
>> > > > at
>> > > >
>> > > >
>> > >
>> >
>> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
>> > > > at
>> > > >
>> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
>> > > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>> > > > <<<
>> > > >
>> > > >
>> > > > Thanks
>> > > > -Pranay
>> > > >
>> > >
>> >
>>

Reply via email to