I see, thank you for the explanation. You might consider being more explicit about this in your documentation. We didn't realize we needed to take the (partitions * fetch size) calculation into account when choosing partition counts for our topics, so this is a bit of a rude surprise.
On Wed, Dec 10, 2014 at 3:50 PM, Gwen Shapira <gshap...@cloudera.com> wrote: > Ah, found where we actually size the request as partitions * fetch size. > > Thanks for the correction, Jay and sorry for the mix-up, Solon. > > On Wed, Dec 10, 2014 at 10:41 AM, Jay Kreps <j...@confluent.io> wrote: > > Hey Solon, > > > > The 10MB size is per-partition. The rationale for this is that the fetch > > size per-partition is effectively a max message size. However with so > many > > partitions on one machine this will lead to a very large fetch size. We > > don't do a great job of scheduling these to stay under a memory bound > > today. Ideally the broker and consumer should do something intelligent to > > stay under a fixed memory budget, this is something we'd like to address > as > > part of the new consumer. > > > > For now you need to either bump up your heap or decrease your fetch size. > > > > -jay > > > > On Wed, Dec 10, 2014 at 7:48 AM, Solon Gordon <so...@knewton.com> wrote: > > > >> I just wanted to bump this issue to see if anyone has thoughts. Based on > >> the error message it seems like the broker is attempting to consume > nearly > >> 2GB of data in a single fetch. Is this expected behavior? > >> > >> Please let us know if more details would be helpful or if it would be > >> better for us to file a JIRA issue. We're using Kafka 0.8.1.1. > >> > >> Thanks, > >> Solon > >> > >> On Thu, Dec 4, 2014 at 12:00 PM, Dmitriy Gromov <dmit...@knewton.com> > >> wrote: > >> > >> > Hi, > >> > > >> > We were recently trying to replace a broker instance and were getting > an > >> > OutOfMemoryException when the new node was coming up. The issue > happened > >> > during the log replication phase. We were able to circumvent this > issue > >> by > >> > copying over all of the logs to the new node before starting it. > >> > > >> > Details: > >> > > >> > - The heap size on the old and new node was 8GB. > >> > - There was about 50GB of log data to transfer. > >> > - There were 1548 partitions across 11 topics > >> > - We recently increased our num.replica.fetchers to solve the problem > >> > described here: https://issues.apache.org/jira/browse/KAFKA-1196. > >> However, > >> > this process worked when we first changed that value. > >> > > >> > [2014-12-04 12:10:22,746] ERROR OOME with size 1867671283 > (kafka.network. > >> > BoundedByteBufferReceive) > >> > java.lang.OutOfMemoryError: Java heap space > >> > at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) > >> > at java.nio.ByteBuffer.allocate(ByteBuffer.java:331) > >> > at kafka.network.BoundedByteBufferReceive.byteBufferAllocate( > >> > BoundedByteBufferReceive.scala:80) > >> > at kafka.network.BoundedByteBufferReceive.readFrom( > >> > BoundedByteBufferReceive.scala:63) > >> > at kafka.network.Receive$class.readCompletely(Transmission.scala:56) > >> > at kafka.network.BoundedByteBufferReceive.readCompletely( > >> > BoundedByteBufferReceive.scala:29) > >> > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) > >> > at > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73) > >> > at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$ > >> > $sendRequest(SimpleConsumer.scala:71) > >> > at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$ > >> > apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109) > >> > at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$ > >> > apply$mcV$sp$1.apply(SimpleConsumer.scala:109) > >> > at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$ > >> > apply$mcV$sp$1.apply(SimpleConsumer.scala:109) > >> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > >> > at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp( > >> > SimpleConsumer.scala:108) > >> > at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply( > >> > SimpleConsumer.scala:108) > >> > at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply( > >> > SimpleConsumer.scala:108) > >> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > >> > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) > >> > at kafka.server.AbstractFetcherThread.processFetchRequest( > >> > AbstractFetcherThread.scala:96) > >> > at > >> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala: > >> > 88) > >> > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > >> > > >> > Thank you > >> > > >> >