Thanks a lot Natty.

I am using this Ruby gem on the client side with all the default config
https://github.com/joekiller/jruby-kafka/blob/master/lib/jruby-kafka/group.rb
and the value fetch.message.max.bytes is set to 1MB.

Currently I only have 3 nodes setup in the Kafka cluster (with 8 GB RAM)
and if 1 consumer if going to take 1000 partitions X 1mb ~ 1GB, does it
mean 1 kafka node can at best support 8 consumer only? Also, when I do
top/free on the Kafka cluster nodes (Both zookeeper and kafka is deployed
on each 3 nodes of the cluster) I don't see lots of memory being used on
the machine. Also, even with this calculation, I shouldn't be facing any
issue with only 1 consumer, as I have 8GB of JVM space given to Kafka
nodes, right?

Thanks
-Pranay

On Mon, Jan 19, 2015 at 2:53 PM, Jonathan Natkins <na...@streamsets.com>
wrote:

> The fetch.message.max.size is actually a client-side configuration. With
> regard to increasing the number of threads, I think the calculation may be
> a little more subtle than what you're proposing, and frankly, it's unlikely
> that your servers can handle allocating 200MB x 1000 threads = 200GB of
> memory at a single time.
>
> I believe that if you have every partition on a single broker, and all of
> your consumer threads are requesting data simultaneously, then yes, the
> broker would attempt to allocate 200GB of heap, and probably you'll hit an
> OOME. However, since each consumer is only reading from one partition,
> those 1000 threads should be making requests that are spread out over the
> entire Kafka cluster. Depending on the memory on your servers, you may need
> to increase the number of brokers in your cluster to support the 1000
> threads. For example, I would expect that you can support this with 10
> brokers if each broker has something north of 20GB of heap allocated.
>
> Some of this is a little bit of guess work on my part, and I'm not super
> confident of my numbers...Can anybody else on the list validate my math?
>
> Thanks,
> Natty
>
> Jonathan "Natty" Natkins
> StreamSets | Customer Engagement Engineer
> mobile: 609.577.1600 | linkedin <http://www.linkedin.com/in/nattyice>
>
>
> On Mon, Jan 19, 2015 at 2:34 PM, Pranay Agarwal <agarwalpran...@gmail.com>
> wrote:
>
> > Thanks Natty.
> >
> > Is there any config which I need to change on the client side as well?
> > Also, currently I am trying with only 1 consumer thread. Does the
> equation
> > changes to
> > (#partitions)*(fetchsize)*(#consumer_threads) in case I try to read with
> > 1000 threads from from topic2(1000 partitions)?
> >
> > -Pranay
> >
> > On Mon, Jan 19, 2015 at 2:26 PM, Jonathan Natkins <na...@streamsets.com>
> > wrote:
> >
> > > Hi Pranay,
> > >
> > > I think the JIRA you're referencing is a bit orthogonal to the OOME
> that
> > > you're experiencing. Based on the stacktrace, it looks like your OOME
> is
> > > coming from a consumer request, which is attempting to allocate 200MB.
> > > There was a thread (relatively recently) that discussed what I think is
> > > your issue:
> > >
> > >
> > >
> >
> http://mail-archives.apache.org/mod_mbox/kafka-users/201412.mbox/%3CCAG1fNJDHHGSL-x3wp=pPZS1asOdOBrQ-Ge3kiA3Bk_iz7o=5...@mail.gmail.com%3E
> > >
> > > I suspect that the takeaway is that the way Kafka determines the
> required
> > > memory for a consumer request is (#partitions in the topic) x
> > > (replica.fetch.max.bytes), and seemingly you don't have enough memory
> > > allocated to handle that request. The solution is likely to increase
> the
> > > heap size on your brokers or to decrease your max fetch size.
> > >
> > > Thanks,
> > > Natty
> > >
> > > Jonathan "Natty" Natkins
> > > StreamSets | Customer Engagement Engineer
> > > mobile: 609.577.1600 | linkedin <http://www.linkedin.com/in/nattyice>
> > >
> > >
> > > On Mon, Jan 19, 2015 at 2:10 PM, Pranay Agarwal <
> > agarwalpran...@gmail.com>
> > > wrote:
> > >
> > > > Hi All,
> > > >
> > > > I have a kafka cluster setup which has 2 topics
> > > >
> > > > topic1 with 10 partitions
> > > > topic2 with 1000 partitions.
> > > >
> > > > While, I am able to consume messages from topic1 just fine, I get
> > > following
> > > > error from the topic2. There is a resolved issue here on the same
> thing
> > > > https://issues.apache.org/jira/browse/KAFKA-664
> > > >
> > > > I am using latest kafka server version, and I have used kafka command
> > > line
> > > > tools to consumer from the topics.
> > > >
> > > > >>>>
> > > > [2015-01-19 22:08:10,758] ERROR OOME with size 201332748
> > > > (kafka.network.BoundedByteBufferReceive)
> > > >
> > > > java.lang.OutOfMemoryError: Java heap space
> > > > at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
> > > > at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
> > > > at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> > > > at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> > > > at
> > > >
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> > > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > > <<<
> > > >
> > > >
> > > > Thanks
> > > > -Pranay
> > > >
> > >
> >
>

Reply via email to