Ah, found where we actually size the request as partitions * fetch size.

Thanks for the correction, Jay and sorry for the mix-up, Solon.

On Wed, Dec 10, 2014 at 10:41 AM, Jay Kreps <j...@confluent.io> wrote:
> Hey Solon,
>
> The 10MB size is per-partition. The rationale for this is that the fetch
> size per-partition is effectively a max message size. However with so many
> partitions on one machine this will lead to a very large fetch size. We
> don't do a great job of scheduling these to stay under a memory bound
> today. Ideally the broker and consumer should do something intelligent to
> stay under a fixed memory budget, this is something we'd like to address as
> part of the new consumer.
>
> For now you need to either bump up your heap or decrease your fetch size.
>
> -jay
>
> On Wed, Dec 10, 2014 at 7:48 AM, Solon Gordon <so...@knewton.com> wrote:
>
>> I just wanted to bump this issue to see if anyone has thoughts. Based on
>> the error message it seems like the broker is attempting to consume nearly
>> 2GB of data in a single fetch. Is this expected behavior?
>>
>> Please let us know if more details would be helpful or if it would be
>> better for us to file a JIRA issue. We're using Kafka 0.8.1.1.
>>
>> Thanks,
>> Solon
>>
>> On Thu, Dec 4, 2014 at 12:00 PM, Dmitriy Gromov <dmit...@knewton.com>
>> wrote:
>>
>> > Hi,
>> >
>> > We were recently trying to replace a broker instance and were getting an
>> > OutOfMemoryException when the new node was coming up. The issue happened
>> > during the log replication phase. We were able to circumvent this issue
>> by
>> > copying over all of the logs to the new node before starting it.
>> >
>> > Details:
>> >
>> > - The heap size on the old and new node was 8GB.
>> > - There was about 50GB of log data to transfer.
>> > - There were 1548 partitions across 11 topics
>> > - We recently increased our num.replica.fetchers to solve the problem
>> > described here: https://issues.apache.org/jira/browse/KAFKA-1196.
>> However,
>> > this process worked when we first changed that value.
>> >
>> > [2014-12-04 12:10:22,746] ERROR OOME with size 1867671283 (kafka.network.
>> > BoundedByteBufferReceive)
>> > java.lang.OutOfMemoryError: Java heap space
>> >   at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>> >   at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
>> >   at kafka.network.BoundedByteBufferReceive.byteBufferAllocate(
>> > BoundedByteBufferReceive.scala:80)
>> >   at kafka.network.BoundedByteBufferReceive.readFrom(
>> > BoundedByteBufferReceive.scala:63)
>> >   at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>> >   at kafka.network.BoundedByteBufferReceive.readCompletely(
>> > BoundedByteBufferReceive.scala:29)
>> >   at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>> >   at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73)
>> >   at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$
>> > $sendRequest(SimpleConsumer.scala:71)
>> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
>> > apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
>> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
>> > apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
>> > apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>> >   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(
>> > SimpleConsumer.scala:108)
>> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
>> > SimpleConsumer.scala:108)
>> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
>> > SimpleConsumer.scala:108)
>> >   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>> >   at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
>> >   at kafka.server.AbstractFetcherThread.processFetchRequest(
>> > AbstractFetcherThread.scala:96)
>> >   at
>> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:
>> > 88)
>> >   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>> >
>> > Thank you
>> >
>>

Reply via email to