Thanks a lot Natty. I am using this Ruby gem on the client side with all the default config https://github.com/joekiller/jruby-kafka/blob/master/lib/jruby-kafka/group.rb and the value fetch.message.max.bytes is set to 1MB.
Currently I only have 3 nodes setup in the Kafka cluster (with 8 GB RAM) and if 1 consumer if going to take 1000 partitions X 1mb ~ 1GB, does it mean 1 kafka node can at best support 8 consumer only? Also, when I do top/free on the Kafka cluster nodes (Both zookeeper and kafka is deployed on each 3 nodes of the cluster) I don't see lots of memory being used on the machine. Also, even with this calculation, I shouldn't be facing any issue with only 1 consumer, as I have 8GB of JVM space given to Kafka nodes, right? Thanks -Pranay On Mon, Jan 19, 2015 at 2:53 PM, Jonathan Natkins <na...@streamsets.com> wrote: > The fetch.message.max.size is actually a client-side configuration. With > regard to increasing the number of threads, I think the calculation may be > a little more subtle than what you're proposing, and frankly, it's unlikely > that your servers can handle allocating 200MB x 1000 threads = 200GB of > memory at a single time. > > I believe that if you have every partition on a single broker, and all of > your consumer threads are requesting data simultaneously, then yes, the > broker would attempt to allocate 200GB of heap, and probably you'll hit an > OOME. However, since each consumer is only reading from one partition, > those 1000 threads should be making requests that are spread out over the > entire Kafka cluster. Depending on the memory on your servers, you may need > to increase the number of brokers in your cluster to support the 1000 > threads. For example, I would expect that you can support this with 10 > brokers if each broker has something north of 20GB of heap allocated. > > Some of this is a little bit of guess work on my part, and I'm not super > confident of my numbers...Can anybody else on the list validate my math? > > Thanks, > Natty > > Jonathan "Natty" Natkins > StreamSets | Customer Engagement Engineer > mobile: 609.577.1600 | linkedin <http://www.linkedin.com/in/nattyice> > > > On Mon, Jan 19, 2015 at 2:34 PM, Pranay Agarwal <agarwalpran...@gmail.com> > wrote: > > > Thanks Natty. > > > > Is there any config which I need to change on the client side as well? > > Also, currently I am trying with only 1 consumer thread. Does the > equation > > changes to > > (#partitions)*(fetchsize)*(#consumer_threads) in case I try to read with > > 1000 threads from from topic2(1000 partitions)? > > > > -Pranay > > > > On Mon, Jan 19, 2015 at 2:26 PM, Jonathan Natkins <na...@streamsets.com> > > wrote: > > > > > Hi Pranay, > > > > > > I think the JIRA you're referencing is a bit orthogonal to the OOME > that > > > you're experiencing. Based on the stacktrace, it looks like your OOME > is > > > coming from a consumer request, which is attempting to allocate 200MB. > > > There was a thread (relatively recently) that discussed what I think is > > > your issue: > > > > > > > > > > > > http://mail-archives.apache.org/mod_mbox/kafka-users/201412.mbox/%3CCAG1fNJDHHGSL-x3wp=pPZS1asOdOBrQ-Ge3kiA3Bk_iz7o=5...@mail.gmail.com%3E > > > > > > I suspect that the takeaway is that the way Kafka determines the > required > > > memory for a consumer request is (#partitions in the topic) x > > > (replica.fetch.max.bytes), and seemingly you don't have enough memory > > > allocated to handle that request. The solution is likely to increase > the > > > heap size on your brokers or to decrease your max fetch size. > > > > > > Thanks, > > > Natty > > > > > > Jonathan "Natty" Natkins > > > StreamSets | Customer Engagement Engineer > > > mobile: 609.577.1600 | linkedin <http://www.linkedin.com/in/nattyice> > > > > > > > > > On Mon, Jan 19, 2015 at 2:10 PM, Pranay Agarwal < > > agarwalpran...@gmail.com> > > > wrote: > > > > > > > Hi All, > > > > > > > > I have a kafka cluster setup which has 2 topics > > > > > > > > topic1 with 10 partitions > > > > topic2 with 1000 partitions. > > > > > > > > While, I am able to consume messages from topic1 just fine, I get > > > following > > > > error from the topic2. There is a resolved issue here on the same > thing > > > > https://issues.apache.org/jira/browse/KAFKA-664 > > > > > > > > I am using latest kafka server version, and I have used kafka command > > > line > > > > tools to consumer from the topics. > > > > > > > > >>>> > > > > [2015-01-19 22:08:10,758] ERROR OOME with size 201332748 > > > > (kafka.network.BoundedByteBufferReceive) > > > > > > > > java.lang.OutOfMemoryError: Java heap space > > > > at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) > > > > at java.nio.ByteBuffer.allocate(ByteBuffer.java:331) > > > > at > > > > > > > > > > > > > > kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80) > > > > at > > > > > > > > > > > > > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63) > > > > at kafka.network.Receive$class.readCompletely(Transmission.scala:56) > > > > at > > > > > > > > > > > > > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > > > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) > > > > at > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81) > > > > at > > > > > > > > > > > > > > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) > > > > at > > > > > > > > > > > > > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109) > > > > at > > > > > > > > > > > > > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) > > > > at > > > > > > > > > > > > > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) > > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > > > > at > > > > > > > > > > > > > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108) > > > > at > > > > > > > > > > > > > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) > > > > at > > > > > > > > > > > > > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) > > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > > > > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) > > > > at > > > > > > > > > > > > > > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96) > > > > at > > > > > > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) > > > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > > > > <<< > > > > > > > > > > > > Thanks > > > > -Pranay > > > > > > > > > >