Hey Mickael, Thanks for picking this up and sorry for the late comment. In the proposed changes section, you have the following:
Update Fetcher.java to check the number of existing in-flight fetches (this > is already tracked by numInFlightFetches) before initiating new fetch > requests in createFetches(). Dividing buffer.memory by max.fetch.bytes > should tell us how many concurrent fetches the consumer can do. So if I use a 10Mb buffer and fetch.max.bytes is also 10Mb, then the consumer will never have more than one in-flight fetch. Is that right? I'm wondering if that approach would tend to underutilize the available memory and degrade throughput. In particular, I would probably expect that the average fetch size would be quite a bit lower than the maximum, so much of the buffer would be wasted, which could cause unnecessary fetch latency. Perhaps another approach would be to send the fetches without consideration for the buffer size and just avoid reading them off the socket if the buffer fills up? One other minor question. Would buffer.memory only take into account the raw size of the received messages? I think we currently delay decompression until the user is ready to receive a record set, but because of max.poll.records, we may only return a subset of the decompressed message set, which means we need to keep the remainder in memory. So if we don't allow some extra space for the decompressed data, we might exceed the expected memory. Not sure it's a big problem, but might be worth thinking about. Thanks, Jason On Thu, Oct 13, 2016 at 8:51 AM, Mickael Maison <mickael.mai...@gmail.com> wrote: > I've now updated the KIP. > > New link as I've updated the title: > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 81%3A+Bound+Fetch+memory+usage+in+the+consumer > > Any further feedback welcome ! > > On Tue, Oct 11, 2016 at 6:00 PM, Mickael Maison > <mickael.mai...@gmail.com> wrote: > > Thanks for the feedback. > > > > Regarding the config name, I agree it's probably best to reuse the > > same name as the producer (buffer.memory) whichever implementation we > > decide to use. > > > > At first, I opted for limiting the max number of concurrent fetches as > > it felt more natural in the Fetcher code. Whereas in the producer we > > keep track of the size of the buffer with RecordAccumulator, the > > consumer simply stores the completed fetches in a list so we don't > > have the used memory size immediately. Also the number of inflight > > fetches was already tracked by Fetcher. > > That said, it shouldn't be too hard to keep track of the used memory > > by the completed fetches collection if we decide to, either way should > > work. > > > > On Mon, Oct 10, 2016 at 3:40 PM, Ismael Juma <ism...@juma.me.uk> wrote: > >> Hi Mickael, > >> > >> Thanks for the KIP. A quick comment on the rejected alternative of > using a > >> bounded memory pool: > >> > >> "While this might be the best way to handle that on the server side it's > >> unclear if this would suit the client well. Usually the client has a > rough > >> idea about how many partitions it will be subscribed to so it's easier > to > >> size the maximum number of concurrent fetches." > >> > >> I think this should be discussed in more detail. The producer (a client) > >> uses a `BufferPool`, so we should also explain why the consumer should > >> follow a different approach. Also, with KIP-74, the number of > partitions is > >> less relevant than the number of brokers with partitions that a > consumer is > >> subscribed to (which can change as the Kafka cluster size changes). > >> > >> I think it's also worth separating implementation from the config > options. > >> For example, one could configure a memory limit with an implementation > that > >> limits the number of concurrent fetches or uses a bounded memory pool. > Are > >> there other good reasons to have an explicit concurrent fetches limit > per > >> consumer config? If so, it would be good to mention them in the KIP. > >> > >> Ismael > >> > >> On Mon, Oct 10, 2016 at 2:41 PM, Mickael Maison < > mickael.mai...@gmail.com> > >> wrote: > >> > >>> Hi all, > >>> > >>> I would like to discuss the following KIP proposal: > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-81%3A+ > >>> Max+in-flight+fetches > >>> > >>> > >>> Feedback and comments are welcome. > >>> Thanks ! > >>> > >>> Mickael > >>> >