The fetch.message.max.size is actually a client-side configuration. With
regard to increasing the number of threads, I think the calculation may be
a little more subtle than what you're proposing, and frankly, it's unlikely
that your servers can handle allocating 200MB x 1000 threads = 200GB of
memory at a single time.

I believe that if you have every partition on a single broker, and all of
your consumer threads are requesting data simultaneously, then yes, the
broker would attempt to allocate 200GB of heap, and probably you'll hit an
OOME. However, since each consumer is only reading from one partition,
those 1000 threads should be making requests that are spread out over the
entire Kafka cluster. Depending on the memory on your servers, you may need
to increase the number of brokers in your cluster to support the 1000
threads. For example, I would expect that you can support this with 10
brokers if each broker has something north of 20GB of heap allocated.

Some of this is a little bit of guess work on my part, and I'm not super
confident of my numbers...Can anybody else on the list validate my math?

Thanks,
Natty

Jonathan "Natty" Natkins
StreamSets | Customer Engagement Engineer
mobile: 609.577.1600 | linkedin <http://www.linkedin.com/in/nattyice>


On Mon, Jan 19, 2015 at 2:34 PM, Pranay Agarwal <agarwalpran...@gmail.com>
wrote:

> Thanks Natty.
>
> Is there any config which I need to change on the client side as well?
> Also, currently I am trying with only 1 consumer thread. Does the equation
> changes to
> (#partitions)*(fetchsize)*(#consumer_threads) in case I try to read with
> 1000 threads from from topic2(1000 partitions)?
>
> -Pranay
>
> On Mon, Jan 19, 2015 at 2:26 PM, Jonathan Natkins <na...@streamsets.com>
> wrote:
>
> > Hi Pranay,
> >
> > I think the JIRA you're referencing is a bit orthogonal to the OOME that
> > you're experiencing. Based on the stacktrace, it looks like your OOME is
> > coming from a consumer request, which is attempting to allocate 200MB.
> > There was a thread (relatively recently) that discussed what I think is
> > your issue:
> >
> >
> >
> http://mail-archives.apache.org/mod_mbox/kafka-users/201412.mbox/%3CCAG1fNJDHHGSL-x3wp=pPZS1asOdOBrQ-Ge3kiA3Bk_iz7o=5...@mail.gmail.com%3E
> >
> > I suspect that the takeaway is that the way Kafka determines the required
> > memory for a consumer request is (#partitions in the topic) x
> > (replica.fetch.max.bytes), and seemingly you don't have enough memory
> > allocated to handle that request. The solution is likely to increase the
> > heap size on your brokers or to decrease your max fetch size.
> >
> > Thanks,
> > Natty
> >
> > Jonathan "Natty" Natkins
> > StreamSets | Customer Engagement Engineer
> > mobile: 609.577.1600 | linkedin <http://www.linkedin.com/in/nattyice>
> >
> >
> > On Mon, Jan 19, 2015 at 2:10 PM, Pranay Agarwal <
> agarwalpran...@gmail.com>
> > wrote:
> >
> > > Hi All,
> > >
> > > I have a kafka cluster setup which has 2 topics
> > >
> > > topic1 with 10 partitions
> > > topic2 with 1000 partitions.
> > >
> > > While, I am able to consume messages from topic1 just fine, I get
> > following
> > > error from the topic2. There is a resolved issue here on the same thing
> > > https://issues.apache.org/jira/browse/KAFKA-664
> > >
> > > I am using latest kafka server version, and I have used kafka command
> > line
> > > tools to consumer from the topics.
> > >
> > > >>>>
> > > [2015-01-19 22:08:10,758] ERROR OOME with size 201332748
> > > (kafka.network.BoundedByteBufferReceive)
> > >
> > > java.lang.OutOfMemoryError: Java heap space
> > > at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
> > > at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
> > > at
> > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
> > > at
> > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
> > > at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> > > at
> > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> > > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
> > > at
> > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> > > at
> > >
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > <<<
> > >
> > >
> > > Thanks
> > > -Pranay
> > >
> >
>

Reply via email to