I see, thank you for the explanation. You might consider being more
explicit about this in your documentation. We didn't realize we needed to
take the (partitions * fetch size) calculation into account when choosing
partition counts for our topics, so this is a bit of a rude surprise.

On Wed, Dec 10, 2014 at 3:50 PM, Gwen Shapira <gshap...@cloudera.com> wrote:

> Ah, found where we actually size the request as partitions * fetch size.
>
> Thanks for the correction, Jay and sorry for the mix-up, Solon.
>
> On Wed, Dec 10, 2014 at 10:41 AM, Jay Kreps <j...@confluent.io> wrote:
> > Hey Solon,
> >
> > The 10MB size is per-partition. The rationale for this is that the fetch
> > size per-partition is effectively a max message size. However with so
> many
> > partitions on one machine this will lead to a very large fetch size. We
> > don't do a great job of scheduling these to stay under a memory bound
> > today. Ideally the broker and consumer should do something intelligent to
> > stay under a fixed memory budget, this is something we'd like to address
> as
> > part of the new consumer.
> >
> > For now you need to either bump up your heap or decrease your fetch size.
> >
> > -jay
> >
> > On Wed, Dec 10, 2014 at 7:48 AM, Solon Gordon <so...@knewton.com> wrote:
> >
> >> I just wanted to bump this issue to see if anyone has thoughts. Based on
> >> the error message it seems like the broker is attempting to consume
> nearly
> >> 2GB of data in a single fetch. Is this expected behavior?
> >>
> >> Please let us know if more details would be helpful or if it would be
> >> better for us to file a JIRA issue. We're using Kafka 0.8.1.1.
> >>
> >> Thanks,
> >> Solon
> >>
> >> On Thu, Dec 4, 2014 at 12:00 PM, Dmitriy Gromov <dmit...@knewton.com>
> >> wrote:
> >>
> >> > Hi,
> >> >
> >> > We were recently trying to replace a broker instance and were getting
> an
> >> > OutOfMemoryException when the new node was coming up. The issue
> happened
> >> > during the log replication phase. We were able to circumvent this
> issue
> >> by
> >> > copying over all of the logs to the new node before starting it.
> >> >
> >> > Details:
> >> >
> >> > - The heap size on the old and new node was 8GB.
> >> > - There was about 50GB of log data to transfer.
> >> > - There were 1548 partitions across 11 topics
> >> > - We recently increased our num.replica.fetchers to solve the problem
> >> > described here: https://issues.apache.org/jira/browse/KAFKA-1196.
> >> However,
> >> > this process worked when we first changed that value.
> >> >
> >> > [2014-12-04 12:10:22,746] ERROR OOME with size 1867671283
> (kafka.network.
> >> > BoundedByteBufferReceive)
> >> > java.lang.OutOfMemoryError: Java heap space
> >> >   at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
> >> >   at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
> >> >   at kafka.network.BoundedByteBufferReceive.byteBufferAllocate(
> >> > BoundedByteBufferReceive.scala:80)
> >> >   at kafka.network.BoundedByteBufferReceive.readFrom(
> >> > BoundedByteBufferReceive.scala:63)
> >> >   at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> >> >   at kafka.network.BoundedByteBufferReceive.readCompletely(
> >> > BoundedByteBufferReceive.scala:29)
> >> >   at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> >> >   at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73)
> >> >   at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$
> >> > $sendRequest(SimpleConsumer.scala:71)
> >> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
> >> > apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
> >> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
> >> > apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> >> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
> >> > apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> >> >   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(
> >> > SimpleConsumer.scala:108)
> >> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
> >> > SimpleConsumer.scala:108)
> >> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
> >> > SimpleConsumer.scala:108)
> >> >   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >> >   at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
> >> >   at kafka.server.AbstractFetcherThread.processFetchRequest(
> >> > AbstractFetcherThread.scala:96)
> >> >   at
> >> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:
> >> > 88)
> >> >   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> >> >
> >> > Thank you
> >> >
> >>
>

Reply via email to