The fetch.message.max.size is actually a client-side configuration. With regard to increasing the number of threads, I think the calculation may be a little more subtle than what you're proposing, and frankly, it's unlikely that your servers can handle allocating 200MB x 1000 threads = 200GB of memory at a single time.
I believe that if you have every partition on a single broker, and all of your consumer threads are requesting data simultaneously, then yes, the broker would attempt to allocate 200GB of heap, and probably you'll hit an OOME. However, since each consumer is only reading from one partition, those 1000 threads should be making requests that are spread out over the entire Kafka cluster. Depending on the memory on your servers, you may need to increase the number of brokers in your cluster to support the 1000 threads. For example, I would expect that you can support this with 10 brokers if each broker has something north of 20GB of heap allocated. Some of this is a little bit of guess work on my part, and I'm not super confident of my numbers...Can anybody else on the list validate my math? Thanks, Natty Jonathan "Natty" Natkins StreamSets | Customer Engagement Engineer mobile: 609.577.1600 | linkedin <http://www.linkedin.com/in/nattyice> On Mon, Jan 19, 2015 at 2:34 PM, Pranay Agarwal <agarwalpran...@gmail.com> wrote: > Thanks Natty. > > Is there any config which I need to change on the client side as well? > Also, currently I am trying with only 1 consumer thread. Does the equation > changes to > (#partitions)*(fetchsize)*(#consumer_threads) in case I try to read with > 1000 threads from from topic2(1000 partitions)? > > -Pranay > > On Mon, Jan 19, 2015 at 2:26 PM, Jonathan Natkins <na...@streamsets.com> > wrote: > > > Hi Pranay, > > > > I think the JIRA you're referencing is a bit orthogonal to the OOME that > > you're experiencing. Based on the stacktrace, it looks like your OOME is > > coming from a consumer request, which is attempting to allocate 200MB. > > There was a thread (relatively recently) that discussed what I think is > > your issue: > > > > > > > http://mail-archives.apache.org/mod_mbox/kafka-users/201412.mbox/%3CCAG1fNJDHHGSL-x3wp=pPZS1asOdOBrQ-Ge3kiA3Bk_iz7o=5...@mail.gmail.com%3E > > > > I suspect that the takeaway is that the way Kafka determines the required > > memory for a consumer request is (#partitions in the topic) x > > (replica.fetch.max.bytes), and seemingly you don't have enough memory > > allocated to handle that request. The solution is likely to increase the > > heap size on your brokers or to decrease your max fetch size. > > > > Thanks, > > Natty > > > > Jonathan "Natty" Natkins > > StreamSets | Customer Engagement Engineer > > mobile: 609.577.1600 | linkedin <http://www.linkedin.com/in/nattyice> > > > > > > On Mon, Jan 19, 2015 at 2:10 PM, Pranay Agarwal < > agarwalpran...@gmail.com> > > wrote: > > > > > Hi All, > > > > > > I have a kafka cluster setup which has 2 topics > > > > > > topic1 with 10 partitions > > > topic2 with 1000 partitions. > > > > > > While, I am able to consume messages from topic1 just fine, I get > > following > > > error from the topic2. There is a resolved issue here on the same thing > > > https://issues.apache.org/jira/browse/KAFKA-664 > > > > > > I am using latest kafka server version, and I have used kafka command > > line > > > tools to consumer from the topics. > > > > > > >>>> > > > [2015-01-19 22:08:10,758] ERROR OOME with size 201332748 > > > (kafka.network.BoundedByteBufferReceive) > > > > > > java.lang.OutOfMemoryError: Java heap space > > > at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) > > > at java.nio.ByteBuffer.allocate(ByteBuffer.java:331) > > > at > > > > > > > > > kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80) > > > at > > > > > > > > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63) > > > at kafka.network.Receive$class.readCompletely(Transmission.scala:56) > > > at > > > > > > > > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) > > > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81) > > > at > > > > > > > > > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) > > > at > > > > > > > > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109) > > > at > > > > > > > > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) > > > at > > > > > > > > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > > > at > > > > > > > > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108) > > > at > > > > > > > > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) > > > at > > > > > > > > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > > > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) > > > at > > > > > > > > > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96) > > > at > > > > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) > > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > > > <<< > > > > > > > > > Thanks > > > -Pranay > > > > > >