+1 (non-binding) On Fri, Mar 31, 2017 at 5:36 PM, radai <radai.rosenbl...@gmail.com> wrote:
> possible priorities: > > 1. keepalives/coordination > 2. inter-broker-traffic > 3. produce traffic > 4. consume traffic > > (dont want to start a debate, just to illustrate there may be >2 of them so > int is better than bool) > > On Fri, Mar 31, 2017 at 9:10 AM, Ismael Juma <ism...@juma.me.uk> wrote: > > > +1 from me too, thanks for the KIP. > > > > Ismael > > > > On Fri, Mar 31, 2017 at 5:06 PM, Jun Rao <j...@confluent.io> wrote: > > > > > Hi, Mickael, > > > > > > Thanks for the KIP. +1 from me too. > > > > > > Jun > > > > > > On Thu, Mar 30, 2017 at 4:40 AM, Mickael Maison < > > mickael.mai...@gmail.com> > > > wrote: > > > > > > > Thanks for the suggestion. > > > > > > > > Currently, I can't think of a scenario when we would need multiple > > > > priority "levels". If in the future it makes sense to have some, I > > > > think we could just make the change without a new KIP as these APIs > > > > are not public. > > > > So I'd be more inclined to keep the boolean for now. > > > > > > > > On Wed, Mar 29, 2017 at 6:13 PM, Edoardo Comar <eco...@uk.ibm.com> > > > wrote: > > > > > Hi Mickael, > > > > > as discussed we could change the priority parameter to be an int > > rather > > > > > than a boolean. > > > > > > > > > > That's a bit more extensible > > > > > -------------------------------------------------- > > > > > Edoardo Comar > > > > > IBM MessageHub > > > > > eco...@uk.ibm.com > > > > > IBM UK Ltd, Hursley Park, SO21 2JN > > > > > > > > > > IBM United Kingdom Limited Registered in England and Wales with > > number > > > > > 741598 Registered office: PO Box 41, North Harbour, Portsmouth, > > Hants. > > > > PO6 > > > > > 3AU > > > > > > > > > > > > > > > > > > > > From: Guozhang Wang <wangg...@gmail.com> > > > > > To: "dev@kafka.apache.org" <dev@kafka.apache.org> > > > > > Date: 28/03/2017 19:02 > > > > > Subject: Re: [VOTE] KIP-81: Bound Fetch memory usage in the > > > > > consumer > > > > > > > > > > > > > > > > > > > > 1) Makes sense. > > > > > 2) Makes sense. Thanks! > > > > > > > > > > On Tue, Mar 28, 2017 at 10:11 AM, Mickael Maison > > > > > <mickael.mai...@gmail.com> > > > > > wrote: > > > > > > > > > >> Hi Guozhang, > > > > >> > > > > >> Thanks for the feedback. > > > > >> > > > > >> 1) By MemoryPool, I mean the implementation added in KIP-72. That > > will > > > > >> most likely be SimpleMemoryPool, but the PR for KIP-72 has not > been > > > > >> merged yet. > > > > >> I've updated the KIP to make it more obvious. > > > > >> > > > > >> 2) I was thinking to pass in the priority when creating the > > > > >> Coordinator Node (in > > > > >> https://github.com/apache/kafka/blob/trunk/clients/src/ > > > > >> main/java/org/apache/kafka/clients/consumer/internals/ > > > > >> AbstractCoordinator.java#L582) > > > > >> Then when calling Selector.connect() (in > > > > >> https://github.com/apache/kafka/blob/trunk/clients/src/ > > > > >> main/java/org/apache/kafka/clients/NetworkClient.java#L643) > > > > >> retrieve it and pass it in the Selector so it uses it when > building > > > > >> the Channel. > > > > >> The idea was to avoid having to deduce the connection is for the > > > > >> Coordinator from the ID but instead have it explicitly set by > > > > >> AbstractCoordinator (and pass it all the way down to the Channel) > > > > >> > > > > >> On Tue, Mar 28, 2017 at 1:33 AM, Guozhang Wang < > wangg...@gmail.com> > > > > > wrote: > > > > >> > Mickael, > > > > >> > > > > > >> > Sorry for the late review of the KIP. I'm +1 on the proposed > > change > > > as > > > > >> > well. Just a few minor comments on the wiki itself: > > > > >> > > > > > >> > 1. By the "MemoryPool" are you referring to a new class impl or > to > > > > >> reusing " > > > > >> > org.apache.kafka.clients.producer.internals.BufferPool"? I > assume > > > it > > > > > was > > > > >> > the latter case, and if yes, could you update the wiki page to > > make > > > it > > > > >> > clear? > > > > >> > > > > > >> > 2. I think it is sufficient to add the priority to KafkaChannel > > > class, > > > > >> but > > > > >> > not needed in Node (but one may need to add this parameter to > > > > > Selector# > > > > >> > connect). Could you point me to which usage of Node needs to > > access > > > > > the > > > > >> > priority? > > > > >> > > > > > >> > > > > > >> > Guozhang > > > > >> > > > > > >> > > > > > >> > On Fri, Mar 10, 2017 at 9:52 AM, Mickael Maison < > > > > >> mickael.mai...@gmail.com> > > > > >> > wrote: > > > > >> > > > > > >> >> Thanks Jason for the feedback! Yes it makes sense to always use > > the > > > > >> >> MemoryPool is we can. I've updated the KIP with the suggestion > > > > >> >> > > > > >> >> On Fri, Mar 10, 2017 at 1:18 AM, Jason Gustafson < > > > ja...@confluent.io > > > > > > > > > >> >> wrote: > > > > >> >> > Just a minor comment. The KIP suggests that coordinator > > responses > > > > > are > > > > >> >> > always allocated outside of the memory pool, but maybe we can > > > > > reserve > > > > >> >> that > > > > >> >> > capability for only when the pool does not have enough space? > > It > > > > >> seems a > > > > >> >> > little nicer to use the pool if we can. If that seems > > reasonable, > > > > > I'm > > > > >> +1 > > > > >> >> on > > > > >> >> > the KIP. Thanks for the effort! > > > > >> >> > > > > > >> >> > -Jason > > > > >> >> > > > > > >> >> > On Tue, Feb 28, 2017 at 10:09 AM, Mickael Maison < > > > > >> >> mickael.mai...@gmail.com> > > > > >> >> > wrote: > > > > >> >> > > > > > >> >> >> Yes I agree, having a generic flag is more future proof. > > > > >> >> >> I'll update the KIP in the coming days. > > > > >> >> >> > > > > >> >> >> Thanks > > > > >> >> >> > > > > >> >> >> On Tue, Feb 28, 2017 at 5:08 AM, Jason Gustafson > > > > > <ja...@confluent.io > > > > >> > > > > > >> >> >> wrote: > > > > >> >> >> > Hey Mickael, > > > > >> >> >> > > > > > >> >> >> > The suggestion to add something to Node makes sense. I > could > > > > >> imagine > > > > >> >> for > > > > >> >> >> > example adding a flag to indicate that the connection has > a > > > > > higher > > > > >> >> >> > "priority," meaning that we can allocate outside of the > > memory > > > > >> pool if > > > > >> >> >> > necessary. That would still be generic even if the only > use > > > case > > > > > is > > > > >> >> the > > > > >> >> >> > consumer coordinator. We might also face a similar problem > > > when > > > > > the > > > > >> >> >> > producer is sending requests to the transaction > coordinator > > > for > > > > >> >> KIP-98. > > > > >> >> >> > What do you think? > > > > >> >> >> > > > > > >> >> >> > Thanks, > > > > >> >> >> > Jason > > > > >> >> >> > > > > > >> >> >> > On Mon, Feb 27, 2017 at 9:09 AM, Mickael Maison < > > > > >> >> >> mickael.mai...@gmail.com> > > > > >> >> >> > wrote: > > > > >> >> >> > > > > > >> >> >> >> Apologies for the late response. > > > > >> >> >> >> > > > > >> >> >> >> Thanks Jason for the suggestion. Yes you are right, the > > > > >> Coordinator > > > > >> >> >> >> connection is "tagged" with a different id, so we could > > > > > retrieve > > > > >> it > > > > >> >> in > > > > >> >> >> >> NetworkReceive to make the distinction. > > > > >> >> >> >> However, currently the coordinator connection are made > > > > > different > > > > >> by > > > > >> >> >> using: > > > > >> >> >> >> Integer.MAX_VALUE - groupCoordinatorResponse.node().id() > > > > >> >> >> >> for the Node id. > > > > >> >> >> >> > > > > >> >> >> >> So to identify Coordinator connections, we'd have to > check > > > that > > > > >> the > > > > >> >> >> >> NetworkReceive source is a value near Integer.MAX_VALUE > > which > > > > > is a > > > > >> >> bit > > > > >> >> >> >> hacky ... > > > > >> >> >> >> > > > > >> >> >> >> Maybe we could add a constructor to Node that allows to > > pass > > > in > > > > > a > > > > >> >> >> >> sourceId String. That way we could make all the > coordinator > > > > >> >> >> >> connections explicit (by setting it to "Coordinator-[ID]" > > for > > > > >> >> >> >> example). > > > > >> >> >> >> What do you think ? > > > > >> >> >> >> > > > > >> >> >> >> On Tue, Jan 24, 2017 at 12:58 AM, Jason Gustafson < > > > > >> >> ja...@confluent.io> > > > > >> >> >> >> wrote: > > > > >> >> >> >> > Good point. The consumer does use a separate connection > > to > > > > > the > > > > >> >> >> >> coordinator, > > > > >> >> >> >> > so perhaps the connection itself could be tagged for > > normal > > > > > heap > > > > >> >> >> >> allocation? > > > > >> >> >> >> > > > > > >> >> >> >> > -Jason > > > > >> >> >> >> > > > > > >> >> >> >> > On Mon, Jan 23, 2017 at 10:26 AM, Onur Karaman < > > > > >> >> >> >> onurkaraman.apa...@gmail.com > > > > >> >> >> >> >> wrote: > > > > >> >> >> >> > > > > > >> >> >> >> >> I only did a quick scan but I wanted to point out > what I > > > > > think > > > > >> is > > > > >> >> an > > > > >> >> >> >> >> incorrect assumption in the KIP's caveats: > > > > >> >> >> >> >> " > > > > >> >> >> >> >> There is a risk using the MemoryPool that, after we > fill > > > up > > > > > the > > > > >> >> >> memory > > > > >> >> >> >> with > > > > >> >> >> >> >> fetch data, we can starve the coordinator's connection > > > > >> >> >> >> >> ... > > > > >> >> >> >> >> To alleviate this issue, only messages larger than 1Kb > > > will > > > > > be > > > > >> >> >> >> allocated in > > > > >> >> >> >> >> the MemoryPool. Smaller messages will be allocated > > > directly > > > > > on > > > > >> the > > > > >> >> >> Heap > > > > >> >> >> >> >> like before. This allows group/heartbeat messages to > > avoid > > > > >> being > > > > >> >> >> >> delayed if > > > > >> >> >> >> >> the MemoryPool fills up. > > > > >> >> >> >> >> " > > > > >> >> >> >> >> > > > > >> >> >> >> >> So it sounds like there's an incorrect assumption that > > > > >> responses > > > > >> >> from > > > > >> >> >> >> the > > > > >> >> >> >> >> coordinator will always be small (< 1Kb as mentioned > in > > > the > > > > >> >> caveat). > > > > >> >> >> >> There > > > > >> >> >> >> >> are now a handful of request types between clients and > > the > > > > >> >> >> coordinator: > > > > >> >> >> >> >> {JoinGroup, SyncGroup, LeaveGroup, Heartbeat, > > > OffsetCommit, > > > > >> >> >> OffsetFetch, > > > > >> >> >> >> >> ListGroups, DescribeGroups}. While true (at least > today) > > > for > > > > >> >> >> >> >> HeartbeatResponse and a few others, I don't think we > can > > > > > assume > > > > >> >> >> >> >> JoinGroupResponse, SyncGroupResponse, > > > > > DescribeGroupsResponse, > > > > >> and > > > > >> >> >> >> >> OffsetFetchResponse will be small, as they are > > effectively > > > > >> >> bounded by > > > > >> >> >> >> the > > > > >> >> >> >> >> max message size allowed by the broker for the > > > > >> __consumer_offsets > > > > >> >> >> topic > > > > >> >> >> >> >> which by default is 1MB. > > > > >> >> >> >> >> > > > > >> >> >> >> >> On Mon, Jan 23, 2017 at 9:46 AM, Mickael Maison < > > > > >> >> >> >> mickael.mai...@gmail.com> > > > > >> >> >> >> >> wrote: > > > > >> >> >> >> >> > > > > >> >> >> >> >> > I've updated the KIP to address all the comments > > raised > > > > > here > > > > >> and > > > > >> >> >> from > > > > >> >> >> >> >> > the "DISCUSS" thread. > > > > >> >> >> >> >> > See: > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > > >> >> >> >> >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer > > > > >> >> >> >> >> > > > > > >> >> >> >> >> > Now, I'd like to restart the vote. > > > > >> >> >> >> >> > > > > > >> >> >> >> >> > On Tue, Dec 6, 2016 at 9:02 AM, Rajini Sivaram > > > > >> >> >> >> >> > <rajinisiva...@googlemail.com> wrote: > > > > >> >> >> >> >> > > Hi Mickael, > > > > >> >> >> >> >> > > > > > > >> >> >> >> >> > > I am +1 on the overall approach of this KIP, but > > have > > > a > > > > >> >> couple of > > > > >> >> >> >> >> > comments > > > > >> >> >> >> >> > > (sorry, should have brought them up on the discuss > > > > > thread > > > > >> >> >> earlier): > > > > >> >> >> >> >> > > > > > > >> >> >> >> >> > > 1. Perhaps it would be better to do this after > > > > > KAFKA-4137 > > > > >> >> >> >> >> > > <https://issues.apache.org/jira/browse/KAFKA-4137 > > > > is > > > > >> >> >> implemented? > > > > >> >> >> >> At > > > > >> >> >> >> >> > the > > > > >> >> >> >> >> > > moment, coordinator shares the same NetworkClient > > (and > > > > >> hence > > > > >> >> the > > > > >> >> >> >> same > > > > >> >> >> >> >> > > Selector) with consumer connections used for > > fetching > > > > >> records. > > > > >> >> >> Since > > > > >> >> >> >> >> > > freeing of memory relies on consuming applications > > > > > invoking > > > > >> >> >> poll() > > > > >> >> >> >> >> after > > > > >> >> >> >> >> > > processing previous records and potentially after > > > > >> committing > > > > >> >> >> >> offsets, > > > > >> >> >> >> >> it > > > > >> >> >> >> >> > > will be good to ensure that coordinator is not > > blocked > > > > > for > > > > >> >> read > > > > >> >> >> by > > > > >> >> >> >> >> fetch > > > > >> >> >> >> >> > > responses. This may be simpler once coordinator > has > > > its > > > > > own > > > > >> >> >> >> Selector. > > > > >> >> >> >> >> > > > > > > >> >> >> >> >> > > 2. The KIP says: *Once messages are returned to > the > > > > > user, > > > > >> >> >> messages > > > > >> >> >> >> are > > > > >> >> >> >> >> > > deleted from the MemoryPool so new messages can be > > > > > stored.* > > > > >> >> >> >> >> > > Can you expand that a bit? I am assuming that > > partial > > > > >> buffers > > > > >> >> >> never > > > > >> >> >> >> get > > > > >> >> >> >> >> > > freed when some messages are returned to the user > > > since > > > > > the > > > > >> >> >> >> consumer is > > > > >> >> >> >> >> > > still holding a reference to the buffer. Would > > buffers > > > > > be > > > > >> >> freed > > > > >> >> >> when > > > > >> >> >> >> >> > > fetches for all the partitions in a response are > > > parsed, > > > > >> but > > > > >> >> >> perhaps > > > > >> >> >> >> >> not > > > > >> >> >> >> >> > > yet returned to the user (i.e., is the memory > freed > > > when > > > > > a > > > > >> >> >> >> reference to > > > > >> >> >> >> >> > the > > > > >> >> >> >> >> > > response buffer is no longer required)? It will be > > > good > > > > > to > > > > >> >> >> document > > > > >> >> >> >> the > > > > >> >> >> >> >> > > (approximate) maximum memory requirement for the > > > > >> >> non-compressed > > > > >> >> >> >> case. > > > > >> >> >> >> >> > There > > > > >> >> >> >> >> > > is data read from the socket, cached in the > Fetcher > > > and > > > > > (as > > > > >> >> Radai > > > > >> >> >> >> has > > > > >> >> >> >> >> > > pointed out), the records still with the user > > > > > application. > > > > >> >> >> >> >> > > > > > > >> >> >> >> >> > > > > > > >> >> >> >> >> > > On Tue, Dec 6, 2016 at 2:04 AM, radai < > > > > >> >> >> radai.rosenbl...@gmail.com> > > > > >> >> >> >> >> > wrote: > > > > >> >> >> >> >> > > > > > > >> >> >> >> >> > >> +1 (non-binding). > > > > >> >> >> >> >> > >> > > > > >> >> >> >> >> > >> small nit pick - just because you returned a > > response > > > > > to > > > > >> user > > > > >> >> >> >> doesnt > > > > >> >> >> >> >> > mean > > > > >> >> >> >> >> > >> the memory id no longer used. for some cases the > > > actual > > > > >> >> "point > > > > >> >> >> of > > > > >> >> >> >> >> > >> termination" may be the deserializer (really > > > > >> impl-dependant), > > > > >> >> >> but > > > > >> >> >> >> >> > >> generally, wouldnt it be "nice" to have an > explicit > > > > >> dispose() > > > > >> >> >> call > > > > >> >> >> >> on > > > > >> >> >> >> >> > >> responses (with the addition that getting the > next > > > > > batch > > > > >> of > > > > >> >> data > > > > >> >> >> >> from > > > > >> >> >> >> >> a > > > > >> >> >> >> >> > >> consumer automatically disposes the previous > > results) > > > > >> >> >> >> >> > >> > > > > >> >> >> >> >> > >> On Mon, Dec 5, 2016 at 6:53 AM, Edoardo Comar < > > > > >> >> >> eco...@uk.ibm.com> > > > > >> >> >> >> >> > wrote: > > > > >> >> >> >> >> > >> > > > > >> >> >> >> >> > >> > +1 (non binding) > > > > >> >> >> >> >> > >> > ------------------------------ > > -------------------- > > > > >> >> >> >> >> > >> > Edoardo Comar > > > > >> >> >> >> >> > >> > IBM MessageHub > > > > >> >> >> >> >> > >> > eco...@uk.ibm.com > > > > >> >> >> >> >> > >> > IBM UK Ltd, Hursley Park, SO21 2JN > > > > >> >> >> >> >> > >> > > > > > >> >> >> >> >> > >> > IBM United Kingdom Limited Registered in > England > > > and > > > > >> Wales > > > > >> >> >> with > > > > >> >> >> >> >> number > > > > >> >> >> >> >> > >> > 741598 Registered office: PO Box 41, North > > Harbour, > > > > >> >> >> Portsmouth, > > > > >> >> >> >> >> Hants. > > > > >> >> >> >> >> > >> PO6 > > > > >> >> >> >> >> > >> > 3AU > > > > >> >> >> >> >> > >> > > > > > >> >> >> >> >> > >> > > > > > >> >> >> >> >> > >> > > > > > >> >> >> >> >> > >> > From: Mickael Maison < > mickael.mai...@gmail.com > > > > > > > >> >> >> >> >> > >> > To: dev@kafka.apache.org > > > > >> >> >> >> >> > >> > Date: 05/12/2016 14:38 > > > > >> >> >> >> >> > >> > Subject: [VOTE] KIP-81: Bound Fetch > memory > > > > > usage > > > > >> in > > > > >> >> the > > > > >> >> >> >> >> > consumer > > > > >> >> >> >> >> > >> > > > > > >> >> >> >> >> > >> > > > > > >> >> >> >> >> > >> > > > > > >> >> >> >> >> > >> > Hi all, > > > > >> >> >> >> >> > >> > > > > > >> >> >> >> >> > >> > I'd like to start the vote for KIP-81: > > > > >> >> >> >> >> > >> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > > >> >> >> >> >> > >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer > > > > >> >> >> >> >> > >> > > > > > >> >> >> >> >> > >> > > > > > >> >> >> >> >> > >> > Thank you > > > > >> >> >> >> >> > >> > > > > > >> >> >> >> >> > >> > > > > > >> >> >> >> >> > >> > > > > > >> >> >> >> >> > >> > > > > > >> >> >> >> >> > >> > Unless stated otherwise above: > > > > >> >> >> >> >> > >> > IBM United Kingdom Limited - Registered in > > England > > > > > and > > > > >> >> Wales > > > > >> >> >> with > > > > >> >> >> >> >> > number > > > > >> >> >> >> >> > >> > 741598. > > > > >> >> >> >> >> > >> > Registered office: PO Box 41, North Harbour, > > > > > Portsmouth, > > > > >> >> >> >> Hampshire > > > > >> >> >> >> >> PO6 > > > > >> >> >> >> >> > >> 3AU > > > > >> >> >> >> >> > >> > > > > > >> >> >> >> >> > >> > > > > >> >> >> >> >> > > > > > > >> >> >> >> >> > > > > > > >> >> >> >> >> > > > > > > >> >> >> >> >> > > -- > > > > >> >> >> >> >> > > Regards, > > > > >> >> >> >> >> > > > > > > >> >> >> >> >> > > Rajini > > > > >> >> >> >> >> > > > > > >> >> >> >> >> > > > > >> >> >> >> > > > > >> >> >> > > > > >> >> > > > > >> > > > > > >> > > > > > >> > > > > > >> > -- > > > > >> > -- Guozhang > > > > >> > > > > > > > > > > > > > > > > > > > > -- > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > Unless stated otherwise above: > > > > > IBM United Kingdom Limited - Registered in England and Wales with > > > number > > > > > 741598. > > > > > Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire > > PO6 > > > > 3AU > > > > > > > > > >