Hi all! I would like to get your feedback on PR for bug KAFKA-2063. Looks like KIP is needed there, but it would be nice to get feedback first.
Thanks, Andrey. > On 22 Jul 2016, at 12:26, Andrey L. Neporada <anepor...@yandex-team.ru> wrote: > > Hi! > > Thanks for feedback - I agree that the proper way to fix this issue is to > provide per-request data limit. > Will try to do it. > > Thanks, > Andrey. > > > >> On 21 Jul 2016, at 18:57, Jay Kreps <j...@confluent.io> wrote: >> >> I think the memory usage for consumers can be improved a lot, but I think >> there may be a better way then what you are proposing. >> >> The problem is exactly what you describe: the bound the user sets is >> per-partition, but the number of partitions may be quite high. The consumer >> could provide a bound on the response size by only requesting a subset of >> the partitions, but this would mean that if there was no data available on >> those partitions the consumer wouldn't be checking other partitions, which >> would add latency. >> >> I think the solution is to add a new "max response size" parameter to the >> fetch request so the server checks all partitions but doesn't send back >> more than this amount in total. This has to be done carefully to ensure >> fairness (i.e. if one partition has unbounded amounts of data it shouldn't >> indefinitely starve other partitions). >> >> This will fix memory management both in the replicas and for consumers. >> >> There is a JIRA for this: https://issues.apache.org/jira/browse/KAFKA-2063 >> >> I think it isn't too hard to do and would be a huge aid to the memory >> profile of both the clients and server. >> >> I also don't think there is much use in setting a max size that expands >> dynamically since in any case you have to be able to support the maximum, >> so you might as well always use that rather than expanding and contracting >> dynamically. That is, if your max fetch response size is 64MB you need to >> budget 64MB of free memory, so making it smaller some of the time doesn't >> really help you. >> >> -Jay >> >> On Thu, Jul 21, 2016 at 2:49 AM, Andrey L. Neporada < >> anepor...@yandex-team.ru> wrote: >> >>> Hi all! >>> >>> We noticed that our Kafka cluster uses a lot of memory for replication. >>> Our Kafka usage pattern is following: >>> >>> 1. Most messages are small (tens or hundreds kilobytes at most), but some >>> (rare) messages can be several megabytes.So, we have to set >>> replica.fetch.max.bytes = max.message.bytes = 8MB >>> 2. Each Kafka broker handles several thousands of partitions from multiple >>> topics. >>> >>> In this scenario total memory required for replication (i.e. >>> replica.fetch.max.bytes * numOfPartitions) is unreasonably big. >>> >>> So we would like to propose following approach to fix this problem: >>> >>> 1. Introduce new config parameter replica.fetch.base.bytes - which is the >>> initial size of replication data chunk. By default this parameter should be >>> equal to replica.fetch.max.bytes so the replication process will work as >>> before. >>> >>> 2. If the ReplicaFetcherThread fails when trying to replicate message >>> bigger than current replication chunk, we increase it twofold (or up to >>> replica.fetch.max.bytes, whichever is smaller) and retry. >>> >>> 3. If the chunk is replicated successfully we try to decrease the size of >>> replication chunk back to replica.fetch.base.bytes. >>> >>> >>> By choosing replica.fetch.base.bytes in optimal way (in our case ~200K), >>> we we able to significatly decrease memory usage without any noticeable >>> impact on replication efficiency. >>> >>> Here is JIRA ticket (with PR): >>> https://issues.apache.org/jira/browse/KAFKA-3979 >>> >>> Your comments and feedback are highly appreciated! >>> >>> >>> Thanks, >>> Andrey. >