Agree that the docs can be better. Perhaps you want to open a JIRA (at
issues.apache.org) with this suggestion?

On Wed, Dec 10, 2014 at 4:03 PM, Solon Gordon <so...@knewton.com> wrote:
> I see, thank you for the explanation. You might consider being more
> explicit about this in your documentation. We didn't realize we needed to
> take the (partitions * fetch size) calculation into account when choosing
> partition counts for our topics, so this is a bit of a rude surprise.
>
> On Wed, Dec 10, 2014 at 3:50 PM, Gwen Shapira <gshap...@cloudera.com> wrote:
>
>> Ah, found where we actually size the request as partitions * fetch size.
>>
>> Thanks for the correction, Jay and sorry for the mix-up, Solon.
>>
>> On Wed, Dec 10, 2014 at 10:41 AM, Jay Kreps <j...@confluent.io> wrote:
>> > Hey Solon,
>> >
>> > The 10MB size is per-partition. The rationale for this is that the fetch
>> > size per-partition is effectively a max message size. However with so
>> many
>> > partitions on one machine this will lead to a very large fetch size. We
>> > don't do a great job of scheduling these to stay under a memory bound
>> > today. Ideally the broker and consumer should do something intelligent to
>> > stay under a fixed memory budget, this is something we'd like to address
>> as
>> > part of the new consumer.
>> >
>> > For now you need to either bump up your heap or decrease your fetch size.
>> >
>> > -jay
>> >
>> > On Wed, Dec 10, 2014 at 7:48 AM, Solon Gordon <so...@knewton.com> wrote:
>> >
>> >> I just wanted to bump this issue to see if anyone has thoughts. Based on
>> >> the error message it seems like the broker is attempting to consume
>> nearly
>> >> 2GB of data in a single fetch. Is this expected behavior?
>> >>
>> >> Please let us know if more details would be helpful or if it would be
>> >> better for us to file a JIRA issue. We're using Kafka 0.8.1.1.
>> >>
>> >> Thanks,
>> >> Solon
>> >>
>> >> On Thu, Dec 4, 2014 at 12:00 PM, Dmitriy Gromov <dmit...@knewton.com>
>> >> wrote:
>> >>
>> >> > Hi,
>> >> >
>> >> > We were recently trying to replace a broker instance and were getting
>> an
>> >> > OutOfMemoryException when the new node was coming up. The issue
>> happened
>> >> > during the log replication phase. We were able to circumvent this
>> issue
>> >> by
>> >> > copying over all of the logs to the new node before starting it.
>> >> >
>> >> > Details:
>> >> >
>> >> > - The heap size on the old and new node was 8GB.
>> >> > - There was about 50GB of log data to transfer.
>> >> > - There were 1548 partitions across 11 topics
>> >> > - We recently increased our num.replica.fetchers to solve the problem
>> >> > described here: https://issues.apache.org/jira/browse/KAFKA-1196.
>> >> However,
>> >> > this process worked when we first changed that value.
>> >> >
>> >> > [2014-12-04 12:10:22,746] ERROR OOME with size 1867671283
>> (kafka.network.
>> >> > BoundedByteBufferReceive)
>> >> > java.lang.OutOfMemoryError: Java heap space
>> >> >   at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>> >> >   at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
>> >> >   at kafka.network.BoundedByteBufferReceive.byteBufferAllocate(
>> >> > BoundedByteBufferReceive.scala:80)
>> >> >   at kafka.network.BoundedByteBufferReceive.readFrom(
>> >> > BoundedByteBufferReceive.scala:63)
>> >> >   at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>> >> >   at kafka.network.BoundedByteBufferReceive.readCompletely(
>> >> > BoundedByteBufferReceive.scala:29)
>> >> >   at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>> >> >   at
>> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73)
>> >> >   at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$
>> >> > $sendRequest(SimpleConsumer.scala:71)
>> >> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
>> >> > apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
>> >> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
>> >> > apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>> >> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
>> >> > apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>> >> >   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>> >> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(
>> >> > SimpleConsumer.scala:108)
>> >> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
>> >> > SimpleConsumer.scala:108)
>> >> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
>> >> > SimpleConsumer.scala:108)
>> >> >   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>> >> >   at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
>> >> >   at kafka.server.AbstractFetcherThread.processFetchRequest(
>> >> > AbstractFetcherThread.scala:96)
>> >> >   at
>> >> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:
>> >> > 88)
>> >> >   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>> >> >
>> >> > Thank you
>> >> >
>> >>
>>

Reply via email to