Hi there, >From my understanding of the protocol (and from digging in the source code a bunch) I can't see anywhere where Kafka overallocates memory based on the fetch request's max bytes, but maybe I have missed something. If there is such a place, then I'd recommend fixing that issue instead - it seems more pressing and will alleviate your issue (unless I'm misunderstanding something and we *have* to overallocate somewhere).
I looked in the fetch request path up and down, and in the leader, tracing from KafkaApis -> ReplicaManager -> Log -> LogSegment, then to FetchResponse and FetchResponseSend (in case you want some pointers to some code). I may be missing something here, but there seems to be a deeper issue here, Tom Crayford Heroku Kafka On Thu, Jul 21, 2016 at 10:49 AM, Andrey L. Neporada < anepor...@yandex-team.ru> wrote: > Hi all! > > We noticed that our Kafka cluster uses a lot of memory for replication. > Our Kafka usage pattern is following: > > 1. Most messages are small (tens or hundreds kilobytes at most), but some > (rare) messages can be several megabytes.So, we have to set > replica.fetch.max.bytes = max.message.bytes = 8MB > 2. Each Kafka broker handles several thousands of partitions from multiple > topics. > > In this scenario total memory required for replication (i.e. > replica.fetch.max.bytes * numOfPartitions) is unreasonably big. > > So we would like to propose following approach to fix this problem: > > 1. Introduce new config parameter replica.fetch.base.bytes - which is the > initial size of replication data chunk. By default this parameter should be > equal to replica.fetch.max.bytes so the replication process will work as > before. > > 2. If the ReplicaFetcherThread fails when trying to replicate message > bigger than current replication chunk, we increase it twofold (or up to > replica.fetch.max.bytes, whichever is smaller) and retry. > > 3. If the chunk is replicated successfully we try to decrease the size of > replication chunk back to replica.fetch.base.bytes. > > > By choosing replica.fetch.base.bytes in optimal way (in our case ~200K), > we we able to significatly decrease memory usage without any noticeable > impact on replication efficiency. > > Here is JIRA ticket (with PR): > https://issues.apache.org/jira/browse/KAFKA-3979 > > Your comments and feedback are highly appreciated! > > > Thanks, > Andrey.