Hi Mickael, as discussed we could change the priority parameter to be an int rather than a boolean.
That's a bit more extensible -------------------------------------------------- Edoardo Comar IBM MessageHub eco...@uk.ibm.com IBM UK Ltd, Hursley Park, SO21 2JN IBM United Kingdom Limited Registered in England and Wales with number 741598 Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU From: Guozhang Wang <wangg...@gmail.com> To: "dev@kafka.apache.org" <dev@kafka.apache.org> Date: 28/03/2017 19:02 Subject: Re: [VOTE] KIP-81: Bound Fetch memory usage in the consumer 1) Makes sense. 2) Makes sense. Thanks! On Tue, Mar 28, 2017 at 10:11 AM, Mickael Maison <mickael.mai...@gmail.com> wrote: > Hi Guozhang, > > Thanks for the feedback. > > 1) By MemoryPool, I mean the implementation added in KIP-72. That will > most likely be SimpleMemoryPool, but the PR for KIP-72 has not been > merged yet. > I've updated the KIP to make it more obvious. > > 2) I was thinking to pass in the priority when creating the > Coordinator Node (in > https://github.com/apache/kafka/blob/trunk/clients/src/ > main/java/org/apache/kafka/clients/consumer/internals/ > AbstractCoordinator.java#L582) > Then when calling Selector.connect() (in > https://github.com/apache/kafka/blob/trunk/clients/src/ > main/java/org/apache/kafka/clients/NetworkClient.java#L643) > retrieve it and pass it in the Selector so it uses it when building > the Channel. > The idea was to avoid having to deduce the connection is for the > Coordinator from the ID but instead have it explicitly set by > AbstractCoordinator (and pass it all the way down to the Channel) > > On Tue, Mar 28, 2017 at 1:33 AM, Guozhang Wang <wangg...@gmail.com> wrote: > > Mickael, > > > > Sorry for the late review of the KIP. I'm +1 on the proposed change as > > well. Just a few minor comments on the wiki itself: > > > > 1. By the "MemoryPool" are you referring to a new class impl or to > reusing " > > org.apache.kafka.clients.producer.internals.BufferPool"? I assume it was > > the latter case, and if yes, could you update the wiki page to make it > > clear? > > > > 2. I think it is sufficient to add the priority to KafkaChannel class, > but > > not needed in Node (but one may need to add this parameter to Selector# > > connect). Could you point me to which usage of Node needs to access the > > priority? > > > > > > Guozhang > > > > > > On Fri, Mar 10, 2017 at 9:52 AM, Mickael Maison < > mickael.mai...@gmail.com> > > wrote: > > > >> Thanks Jason for the feedback! Yes it makes sense to always use the > >> MemoryPool is we can. I've updated the KIP with the suggestion > >> > >> On Fri, Mar 10, 2017 at 1:18 AM, Jason Gustafson <ja...@confluent.io> > >> wrote: > >> > Just a minor comment. The KIP suggests that coordinator responses are > >> > always allocated outside of the memory pool, but maybe we can reserve > >> that > >> > capability for only when the pool does not have enough space? It > seems a > >> > little nicer to use the pool if we can. If that seems reasonable, I'm > +1 > >> on > >> > the KIP. Thanks for the effort! > >> > > >> > -Jason > >> > > >> > On Tue, Feb 28, 2017 at 10:09 AM, Mickael Maison < > >> mickael.mai...@gmail.com> > >> > wrote: > >> > > >> >> Yes I agree, having a generic flag is more future proof. > >> >> I'll update the KIP in the coming days. > >> >> > >> >> Thanks > >> >> > >> >> On Tue, Feb 28, 2017 at 5:08 AM, Jason Gustafson <ja...@confluent.io > > > >> >> wrote: > >> >> > Hey Mickael, > >> >> > > >> >> > The suggestion to add something to Node makes sense. I could > imagine > >> for > >> >> > example adding a flag to indicate that the connection has a higher > >> >> > "priority," meaning that we can allocate outside of the memory > pool if > >> >> > necessary. That would still be generic even if the only use case is > >> the > >> >> > consumer coordinator. We might also face a similar problem when the > >> >> > producer is sending requests to the transaction coordinator for > >> KIP-98. > >> >> > What do you think? > >> >> > > >> >> > Thanks, > >> >> > Jason > >> >> > > >> >> > On Mon, Feb 27, 2017 at 9:09 AM, Mickael Maison < > >> >> mickael.mai...@gmail.com> > >> >> > wrote: > >> >> > > >> >> >> Apologies for the late response. > >> >> >> > >> >> >> Thanks Jason for the suggestion. Yes you are right, the > Coordinator > >> >> >> connection is "tagged" with a different id, so we could retrieve > it > >> in > >> >> >> NetworkReceive to make the distinction. > >> >> >> However, currently the coordinator connection are made different > by > >> >> using: > >> >> >> Integer.MAX_VALUE - groupCoordinatorResponse.node().id() > >> >> >> for the Node id. > >> >> >> > >> >> >> So to identify Coordinator connections, we'd have to check that > the > >> >> >> NetworkReceive source is a value near Integer.MAX_VALUE which is a > >> bit > >> >> >> hacky ... > >> >> >> > >> >> >> Maybe we could add a constructor to Node that allows to pass in a > >> >> >> sourceId String. That way we could make all the coordinator > >> >> >> connections explicit (by setting it to "Coordinator-[ID]" for > >> >> >> example). > >> >> >> What do you think ? > >> >> >> > >> >> >> On Tue, Jan 24, 2017 at 12:58 AM, Jason Gustafson < > >> ja...@confluent.io> > >> >> >> wrote: > >> >> >> > Good point. The consumer does use a separate connection to the > >> >> >> coordinator, > >> >> >> > so perhaps the connection itself could be tagged for normal heap > >> >> >> allocation? > >> >> >> > > >> >> >> > -Jason > >> >> >> > > >> >> >> > On Mon, Jan 23, 2017 at 10:26 AM, Onur Karaman < > >> >> >> onurkaraman.apa...@gmail.com > >> >> >> >> wrote: > >> >> >> > > >> >> >> >> I only did a quick scan but I wanted to point out what I think > is > >> an > >> >> >> >> incorrect assumption in the KIP's caveats: > >> >> >> >> " > >> >> >> >> There is a risk using the MemoryPool that, after we fill up the > >> >> memory > >> >> >> with > >> >> >> >> fetch data, we can starve the coordinator's connection > >> >> >> >> ... > >> >> >> >> To alleviate this issue, only messages larger than 1Kb will be > >> >> >> allocated in > >> >> >> >> the MemoryPool. Smaller messages will be allocated directly on > the > >> >> Heap > >> >> >> >> like before. This allows group/heartbeat messages to avoid > being > >> >> >> delayed if > >> >> >> >> the MemoryPool fills up. > >> >> >> >> " > >> >> >> >> > >> >> >> >> So it sounds like there's an incorrect assumption that > responses > >> from > >> >> >> the > >> >> >> >> coordinator will always be small (< 1Kb as mentioned in the > >> caveat). > >> >> >> There > >> >> >> >> are now a handful of request types between clients and the > >> >> coordinator: > >> >> >> >> {JoinGroup, SyncGroup, LeaveGroup, Heartbeat, OffsetCommit, > >> >> OffsetFetch, > >> >> >> >> ListGroups, DescribeGroups}. While true (at least today) for > >> >> >> >> HeartbeatResponse and a few others, I don't think we can assume > >> >> >> >> JoinGroupResponse, SyncGroupResponse, DescribeGroupsResponse, > and > >> >> >> >> OffsetFetchResponse will be small, as they are effectively > >> bounded by > >> >> >> the > >> >> >> >> max message size allowed by the broker for the > __consumer_offsets > >> >> topic > >> >> >> >> which by default is 1MB. > >> >> >> >> > >> >> >> >> On Mon, Jan 23, 2017 at 9:46 AM, Mickael Maison < > >> >> >> mickael.mai...@gmail.com> > >> >> >> >> wrote: > >> >> >> >> > >> >> >> >> > I've updated the KIP to address all the comments raised here > and > >> >> from > >> >> >> >> > the "DISCUSS" thread. > >> >> >> >> > See: https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >> >> >> >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer > >> >> >> >> > > >> >> >> >> > Now, I'd like to restart the vote. > >> >> >> >> > > >> >> >> >> > On Tue, Dec 6, 2016 at 9:02 AM, Rajini Sivaram > >> >> >> >> > <rajinisiva...@googlemail.com> wrote: > >> >> >> >> > > Hi Mickael, > >> >> >> >> > > > >> >> >> >> > > I am +1 on the overall approach of this KIP, but have a > >> couple of > >> >> >> >> > comments > >> >> >> >> > > (sorry, should have brought them up on the discuss thread > >> >> earlier): > >> >> >> >> > > > >> >> >> >> > > 1. Perhaps it would be better to do this after KAFKA-4137 > >> >> >> >> > > <https://issues.apache.org/jira/browse/KAFKA-4137> is > >> >> implemented? > >> >> >> At > >> >> >> >> > the > >> >> >> >> > > moment, coordinator shares the same NetworkClient (and > hence > >> the > >> >> >> same > >> >> >> >> > > Selector) with consumer connections used for fetching > records. > >> >> Since > >> >> >> >> > > freeing of memory relies on consuming applications invoking > >> >> poll() > >> >> >> >> after > >> >> >> >> > > processing previous records and potentially after > committing > >> >> >> offsets, > >> >> >> >> it > >> >> >> >> > > will be good to ensure that coordinator is not blocked for > >> read > >> >> by > >> >> >> >> fetch > >> >> >> >> > > responses. This may be simpler once coordinator has its own > >> >> >> Selector. > >> >> >> >> > > > >> >> >> >> > > 2. The KIP says: *Once messages are returned to the user, > >> >> messages > >> >> >> are > >> >> >> >> > > deleted from the MemoryPool so new messages can be stored.* > >> >> >> >> > > Can you expand that a bit? I am assuming that partial > buffers > >> >> never > >> >> >> get > >> >> >> >> > > freed when some messages are returned to the user since the > >> >> >> consumer is > >> >> >> >> > > still holding a reference to the buffer. Would buffers be > >> freed > >> >> when > >> >> >> >> > > fetches for all the partitions in a response are parsed, > but > >> >> perhaps > >> >> >> >> not > >> >> >> >> > > yet returned to the user (i.e., is the memory freed when a > >> >> >> reference to > >> >> >> >> > the > >> >> >> >> > > response buffer is no longer required)? It will be good to > >> >> document > >> >> >> the > >> >> >> >> > > (approximate) maximum memory requirement for the > >> non-compressed > >> >> >> case. > >> >> >> >> > There > >> >> >> >> > > is data read from the socket, cached in the Fetcher and (as > >> Radai > >> >> >> has > >> >> >> >> > > pointed out), the records still with the user application. > >> >> >> >> > > > >> >> >> >> > > > >> >> >> >> > > On Tue, Dec 6, 2016 at 2:04 AM, radai < > >> >> radai.rosenbl...@gmail.com> > >> >> >> >> > wrote: > >> >> >> >> > > > >> >> >> >> > >> +1 (non-binding). > >> >> >> >> > >> > >> >> >> >> > >> small nit pick - just because you returned a response to > user > >> >> >> doesnt > >> >> >> >> > mean > >> >> >> >> > >> the memory id no longer used. for some cases the actual > >> "point > >> >> of > >> >> >> >> > >> termination" may be the deserializer (really > impl-dependant), > >> >> but > >> >> >> >> > >> generally, wouldnt it be "nice" to have an explicit > dispose() > >> >> call > >> >> >> on > >> >> >> >> > >> responses (with the addition that getting the next batch > of > >> data > >> >> >> from > >> >> >> >> a > >> >> >> >> > >> consumer automatically disposes the previous results) > >> >> >> >> > >> > >> >> >> >> > >> On Mon, Dec 5, 2016 at 6:53 AM, Edoardo Comar < > >> >> eco...@uk.ibm.com> > >> >> >> >> > wrote: > >> >> >> >> > >> > >> >> >> >> > >> > +1 (non binding) > >> >> >> >> > >> > -------------------------------------------------- > >> >> >> >> > >> > Edoardo Comar > >> >> >> >> > >> > IBM MessageHub > >> >> >> >> > >> > eco...@uk.ibm.com > >> >> >> >> > >> > IBM UK Ltd, Hursley Park, SO21 2JN > >> >> >> >> > >> > > >> >> >> >> > >> > IBM United Kingdom Limited Registered in England and > Wales > >> >> with > >> >> >> >> number > >> >> >> >> > >> > 741598 Registered office: PO Box 41, North Harbour, > >> >> Portsmouth, > >> >> >> >> Hants. > >> >> >> >> > >> PO6 > >> >> >> >> > >> > 3AU > >> >> >> >> > >> > > >> >> >> >> > >> > > >> >> >> >> > >> > > >> >> >> >> > >> > From: Mickael Maison <mickael.mai...@gmail.com> > >> >> >> >> > >> > To: dev@kafka.apache.org > >> >> >> >> > >> > Date: 05/12/2016 14:38 > >> >> >> >> > >> > Subject: [VOTE] KIP-81: Bound Fetch memory usage > in > >> the > >> >> >> >> > consumer > >> >> >> >> > >> > > >> >> >> >> > >> > > >> >> >> >> > >> > > >> >> >> >> > >> > Hi all, > >> >> >> >> > >> > > >> >> >> >> > >> > I'd like to start the vote for KIP-81: > >> >> >> >> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >> >> >> >> > >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer > >> >> >> >> > >> > > >> >> >> >> > >> > > >> >> >> >> > >> > Thank you > >> >> >> >> > >> > > >> >> >> >> > >> > > >> >> >> >> > >> > > >> >> >> >> > >> > > >> >> >> >> > >> > Unless stated otherwise above: > >> >> >> >> > >> > IBM United Kingdom Limited - Registered in England and > >> Wales > >> >> with > >> >> >> >> > number > >> >> >> >> > >> > 741598. > >> >> >> >> > >> > Registered office: PO Box 41, North Harbour, Portsmouth, > >> >> >> Hampshire > >> >> >> >> PO6 > >> >> >> >> > >> 3AU > >> >> >> >> > >> > > >> >> >> >> > >> > >> >> >> >> > > > >> >> >> >> > > > >> >> >> >> > > > >> >> >> >> > > -- > >> >> >> >> > > Regards, > >> >> >> >> > > > >> >> >> >> > > Rajini > >> >> >> >> > > >> >> >> >> > >> >> >> > >> >> > >> > > > > > > > > -- > > -- Guozhang > -- -- Guozhang Unless stated otherwise above: IBM United Kingdom Limited - Registered in England and Wales with number 741598. Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU