Andrey - I’m not sure we quite have consensus on the Randomisation vs Round 
Robin issue but it’s probably worth you just raising a kip and put one of the 
options as a rejected alternative. 

B
> On 29 Jul 2016, at 11:59, Ben Stopford <b...@confluent.io> wrote:
> 
> Thanks for the kicking this one off Andrey. Generally it looks great! 
> 
> I left a comment on the Jira regarding whether we should remove the existing 
> limitBytes, along with a potential alternative to doing randomisation. 
> 
> B
>> On 29 Jul 2016, at 09:17, Andrey L. Neporada <anepor...@yandex-team.ru> 
>> wrote:
>> 
>> Hi all!
>> 
>> I would like to get your feedback on PR for bug KAFKA-2063.
>> Looks like KIP is needed there, but it would be nice to get feedback first.
>> 
>> Thanks,
>> Andrey.
>> 
>> 
>>> On 22 Jul 2016, at 12:26, Andrey L. Neporada <anepor...@yandex-team.ru> 
>>> wrote:
>>> 
>>> Hi!
>>> 
>>> Thanks for feedback - I agree that the proper way to fix this issue is to 
>>> provide per-request data limit.
>>> Will try to do it.
>>> 
>>> Thanks,
>>> Andrey.
>>> 
>>> 
>>> 
>>>> On 21 Jul 2016, at 18:57, Jay Kreps <j...@confluent.io> wrote:
>>>> 
>>>> I think the memory usage for consumers can be improved a lot, but I think
>>>> there may be a better way then what you are proposing.
>>>> 
>>>> The problem is exactly what you describe: the bound the user sets is
>>>> per-partition, but the number of partitions may be quite high. The consumer
>>>> could provide a bound on the response size by only requesting a subset of
>>>> the partitions, but this would mean that if there was no data available on
>>>> those partitions the consumer wouldn't be checking other partitions, which
>>>> would add latency.
>>>> 
>>>> I think the solution is to add a new "max response size" parameter to the
>>>> fetch request so the server checks all partitions but doesn't send back
>>>> more than this amount in total. This has to be done carefully to ensure
>>>> fairness (i.e. if one partition has unbounded amounts of data it shouldn't
>>>> indefinitely starve other partitions).
>>>> 
>>>> This will fix memory management both in the replicas and for consumers.
>>>> 
>>>> There is a JIRA for this: https://issues.apache.org/jira/browse/KAFKA-2063
>>>> 
>>>> I think it isn't too hard to do and would be a huge aid to the memory
>>>> profile of both the clients and server.
>>>> 
>>>> I also don't think there is much use in setting a max size that expands
>>>> dynamically since in any case you have to be able to support the maximum,
>>>> so you might as well always use that rather than expanding and contracting
>>>> dynamically. That is, if your max fetch response size is 64MB you need to
>>>> budget 64MB of free memory, so making it smaller some of the time doesn't
>>>> really help you.
>>>> 
>>>> -Jay
>>>> 
>>>> On Thu, Jul 21, 2016 at 2:49 AM, Andrey L. Neporada <
>>>> anepor...@yandex-team.ru> wrote:
>>>> 
>>>>> Hi all!
>>>>> 
>>>>> We noticed that our Kafka cluster uses a lot of memory for replication.
>>>>> Our Kafka usage pattern is following:
>>>>> 
>>>>> 1. Most messages are small (tens or hundreds kilobytes at most), but some
>>>>> (rare) messages can be several megabytes.So, we have to set
>>>>> replica.fetch.max.bytes = max.message.bytes = 8MB
>>>>> 2. Each Kafka broker handles several thousands of partitions from multiple
>>>>> topics.
>>>>> 
>>>>> In this scenario total memory required for replication (i.e.
>>>>> replica.fetch.max.bytes * numOfPartitions) is unreasonably big.
>>>>> 
>>>>> So we would like to propose following approach to fix this problem:
>>>>> 
>>>>> 1. Introduce new config parameter replica.fetch.base.bytes - which is the
>>>>> initial size of replication data chunk. By default this parameter should 
>>>>> be
>>>>> equal to replica.fetch.max.bytes so the replication process will work as
>>>>> before.
>>>>> 
>>>>> 2. If the ReplicaFetcherThread fails when trying to replicate message
>>>>> bigger than current replication chunk, we increase it twofold (or up to
>>>>> replica.fetch.max.bytes, whichever is smaller) and retry.
>>>>> 
>>>>> 3. If the chunk is replicated successfully we try to decrease the size of
>>>>> replication chunk back to replica.fetch.base.bytes.
>>>>> 
>>>>> 
>>>>> By choosing replica.fetch.base.bytes in optimal way (in our case ~200K),
>>>>> we we able to significatly decrease memory usage without any noticeable
>>>>> impact on replication efficiency.
>>>>> 
>>>>> Here is JIRA ticket (with PR):
>>>>> https://issues.apache.org/jira/browse/KAFKA-3979
>>>>> 
>>>>> Your comments and feedback are highly appreciated!
>>>>> 
>>>>> 
>>>>> Thanks,
>>>>> Andrey.
>>> 
>> 
> 

Reply via email to