Agree that the docs can be better. Perhaps you want to open a JIRA (at issues.apache.org) with this suggestion?
On Wed, Dec 10, 2014 at 4:03 PM, Solon Gordon <so...@knewton.com> wrote: > I see, thank you for the explanation. You might consider being more > explicit about this in your documentation. We didn't realize we needed to > take the (partitions * fetch size) calculation into account when choosing > partition counts for our topics, so this is a bit of a rude surprise. > > On Wed, Dec 10, 2014 at 3:50 PM, Gwen Shapira <gshap...@cloudera.com> wrote: > >> Ah, found where we actually size the request as partitions * fetch size. >> >> Thanks for the correction, Jay and sorry for the mix-up, Solon. >> >> On Wed, Dec 10, 2014 at 10:41 AM, Jay Kreps <j...@confluent.io> wrote: >> > Hey Solon, >> > >> > The 10MB size is per-partition. The rationale for this is that the fetch >> > size per-partition is effectively a max message size. However with so >> many >> > partitions on one machine this will lead to a very large fetch size. We >> > don't do a great job of scheduling these to stay under a memory bound >> > today. Ideally the broker and consumer should do something intelligent to >> > stay under a fixed memory budget, this is something we'd like to address >> as >> > part of the new consumer. >> > >> > For now you need to either bump up your heap or decrease your fetch size. >> > >> > -jay >> > >> > On Wed, Dec 10, 2014 at 7:48 AM, Solon Gordon <so...@knewton.com> wrote: >> > >> >> I just wanted to bump this issue to see if anyone has thoughts. Based on >> >> the error message it seems like the broker is attempting to consume >> nearly >> >> 2GB of data in a single fetch. Is this expected behavior? >> >> >> >> Please let us know if more details would be helpful or if it would be >> >> better for us to file a JIRA issue. We're using Kafka 0.8.1.1. >> >> >> >> Thanks, >> >> Solon >> >> >> >> On Thu, Dec 4, 2014 at 12:00 PM, Dmitriy Gromov <dmit...@knewton.com> >> >> wrote: >> >> >> >> > Hi, >> >> > >> >> > We were recently trying to replace a broker instance and were getting >> an >> >> > OutOfMemoryException when the new node was coming up. The issue >> happened >> >> > during the log replication phase. We were able to circumvent this >> issue >> >> by >> >> > copying over all of the logs to the new node before starting it. >> >> > >> >> > Details: >> >> > >> >> > - The heap size on the old and new node was 8GB. >> >> > - There was about 50GB of log data to transfer. >> >> > - There were 1548 partitions across 11 topics >> >> > - We recently increased our num.replica.fetchers to solve the problem >> >> > described here: https://issues.apache.org/jira/browse/KAFKA-1196. >> >> However, >> >> > this process worked when we first changed that value. >> >> > >> >> > [2014-12-04 12:10:22,746] ERROR OOME with size 1867671283 >> (kafka.network. >> >> > BoundedByteBufferReceive) >> >> > java.lang.OutOfMemoryError: Java heap space >> >> > at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) >> >> > at java.nio.ByteBuffer.allocate(ByteBuffer.java:331) >> >> > at kafka.network.BoundedByteBufferReceive.byteBufferAllocate( >> >> > BoundedByteBufferReceive.scala:80) >> >> > at kafka.network.BoundedByteBufferReceive.readFrom( >> >> > BoundedByteBufferReceive.scala:63) >> >> > at kafka.network.Receive$class.readCompletely(Transmission.scala:56) >> >> > at kafka.network.BoundedByteBufferReceive.readCompletely( >> >> > BoundedByteBufferReceive.scala:29) >> >> > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) >> >> > at >> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73) >> >> > at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$ >> >> > $sendRequest(SimpleConsumer.scala:71) >> >> > at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$ >> >> > apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109) >> >> > at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$ >> >> > apply$mcV$sp$1.apply(SimpleConsumer.scala:109) >> >> > at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$ >> >> > apply$mcV$sp$1.apply(SimpleConsumer.scala:109) >> >> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) >> >> > at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp( >> >> > SimpleConsumer.scala:108) >> >> > at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply( >> >> > SimpleConsumer.scala:108) >> >> > at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply( >> >> > SimpleConsumer.scala:108) >> >> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) >> >> > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) >> >> > at kafka.server.AbstractFetcherThread.processFetchRequest( >> >> > AbstractFetcherThread.scala:96) >> >> > at >> >> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala: >> >> > 88) >> >> > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) >> >> > >> >> > Thank you >> >> > >> >> >>