I think the memory usage for consumers can be improved a lot, but I think there may be a better way then what you are proposing.
The problem is exactly what you describe: the bound the user sets is per-partition, but the number of partitions may be quite high. The consumer could provide a bound on the response size by only requesting a subset of the partitions, but this would mean that if there was no data available on those partitions the consumer wouldn't be checking other partitions, which would add latency. I think the solution is to add a new "max response size" parameter to the fetch request so the server checks all partitions but doesn't send back more than this amount in total. This has to be done carefully to ensure fairness (i.e. if one partition has unbounded amounts of data it shouldn't indefinitely starve other partitions). This will fix memory management both in the replicas and for consumers. There is a JIRA for this: https://issues.apache.org/jira/browse/KAFKA-2063 I think it isn't too hard to do and would be a huge aid to the memory profile of both the clients and server. I also don't think there is much use in setting a max size that expands dynamically since in any case you have to be able to support the maximum, so you might as well always use that rather than expanding and contracting dynamically. That is, if your max fetch response size is 64MB you need to budget 64MB of free memory, so making it smaller some of the time doesn't really help you. -Jay On Thu, Jul 21, 2016 at 2:49 AM, Andrey L. Neporada < anepor...@yandex-team.ru> wrote: > Hi all! > > We noticed that our Kafka cluster uses a lot of memory for replication. > Our Kafka usage pattern is following: > > 1. Most messages are small (tens or hundreds kilobytes at most), but some > (rare) messages can be several megabytes.So, we have to set > replica.fetch.max.bytes = max.message.bytes = 8MB > 2. Each Kafka broker handles several thousands of partitions from multiple > topics. > > In this scenario total memory required for replication (i.e. > replica.fetch.max.bytes * numOfPartitions) is unreasonably big. > > So we would like to propose following approach to fix this problem: > > 1. Introduce new config parameter replica.fetch.base.bytes - which is the > initial size of replication data chunk. By default this parameter should be > equal to replica.fetch.max.bytes so the replication process will work as > before. > > 2. If the ReplicaFetcherThread fails when trying to replicate message > bigger than current replication chunk, we increase it twofold (or up to > replica.fetch.max.bytes, whichever is smaller) and retry. > > 3. If the chunk is replicated successfully we try to decrease the size of > replication chunk back to replica.fetch.base.bytes. > > > By choosing replica.fetch.base.bytes in optimal way (in our case ~200K), > we we able to significatly decrease memory usage without any noticeable > impact on replication efficiency. > > Here is JIRA ticket (with PR): > https://issues.apache.org/jira/browse/KAFKA-3979 > > Your comments and feedback are highly appreciated! > > > Thanks, > Andrey.