Thanks for the suggestion. Currently, I can't think of a scenario when we would need multiple priority "levels". If in the future it makes sense to have some, I think we could just make the change without a new KIP as these APIs are not public. So I'd be more inclined to keep the boolean for now.
On Wed, Mar 29, 2017 at 6:13 PM, Edoardo Comar <eco...@uk.ibm.com> wrote: > Hi Mickael, > as discussed we could change the priority parameter to be an int rather > than a boolean. > > That's a bit more extensible > -------------------------------------------------- > Edoardo Comar > IBM MessageHub > eco...@uk.ibm.com > IBM UK Ltd, Hursley Park, SO21 2JN > > IBM United Kingdom Limited Registered in England and Wales with number > 741598 Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 > 3AU > > > > From: Guozhang Wang <wangg...@gmail.com> > To: "dev@kafka.apache.org" <dev@kafka.apache.org> > Date: 28/03/2017 19:02 > Subject: Re: [VOTE] KIP-81: Bound Fetch memory usage in the > consumer > > > > 1) Makes sense. > 2) Makes sense. Thanks! > > On Tue, Mar 28, 2017 at 10:11 AM, Mickael Maison > <mickael.mai...@gmail.com> > wrote: > >> Hi Guozhang, >> >> Thanks for the feedback. >> >> 1) By MemoryPool, I mean the implementation added in KIP-72. That will >> most likely be SimpleMemoryPool, but the PR for KIP-72 has not been >> merged yet. >> I've updated the KIP to make it more obvious. >> >> 2) I was thinking to pass in the priority when creating the >> Coordinator Node (in >> https://github.com/apache/kafka/blob/trunk/clients/src/ >> main/java/org/apache/kafka/clients/consumer/internals/ >> AbstractCoordinator.java#L582) >> Then when calling Selector.connect() (in >> https://github.com/apache/kafka/blob/trunk/clients/src/ >> main/java/org/apache/kafka/clients/NetworkClient.java#L643) >> retrieve it and pass it in the Selector so it uses it when building >> the Channel. >> The idea was to avoid having to deduce the connection is for the >> Coordinator from the ID but instead have it explicitly set by >> AbstractCoordinator (and pass it all the way down to the Channel) >> >> On Tue, Mar 28, 2017 at 1:33 AM, Guozhang Wang <wangg...@gmail.com> > wrote: >> > Mickael, >> > >> > Sorry for the late review of the KIP. I'm +1 on the proposed change as >> > well. Just a few minor comments on the wiki itself: >> > >> > 1. By the "MemoryPool" are you referring to a new class impl or to >> reusing " >> > org.apache.kafka.clients.producer.internals.BufferPool"? I assume it > was >> > the latter case, and if yes, could you update the wiki page to make it >> > clear? >> > >> > 2. I think it is sufficient to add the priority to KafkaChannel class, >> but >> > not needed in Node (but one may need to add this parameter to > Selector# >> > connect). Could you point me to which usage of Node needs to access > the >> > priority? >> > >> > >> > Guozhang >> > >> > >> > On Fri, Mar 10, 2017 at 9:52 AM, Mickael Maison < >> mickael.mai...@gmail.com> >> > wrote: >> > >> >> Thanks Jason for the feedback! Yes it makes sense to always use the >> >> MemoryPool is we can. I've updated the KIP with the suggestion >> >> >> >> On Fri, Mar 10, 2017 at 1:18 AM, Jason Gustafson <ja...@confluent.io> >> >> wrote: >> >> > Just a minor comment. The KIP suggests that coordinator responses > are >> >> > always allocated outside of the memory pool, but maybe we can > reserve >> >> that >> >> > capability for only when the pool does not have enough space? It >> seems a >> >> > little nicer to use the pool if we can. If that seems reasonable, > I'm >> +1 >> >> on >> >> > the KIP. Thanks for the effort! >> >> > >> >> > -Jason >> >> > >> >> > On Tue, Feb 28, 2017 at 10:09 AM, Mickael Maison < >> >> mickael.mai...@gmail.com> >> >> > wrote: >> >> > >> >> >> Yes I agree, having a generic flag is more future proof. >> >> >> I'll update the KIP in the coming days. >> >> >> >> >> >> Thanks >> >> >> >> >> >> On Tue, Feb 28, 2017 at 5:08 AM, Jason Gustafson > <ja...@confluent.io >> > >> >> >> wrote: >> >> >> > Hey Mickael, >> >> >> > >> >> >> > The suggestion to add something to Node makes sense. I could >> imagine >> >> for >> >> >> > example adding a flag to indicate that the connection has a > higher >> >> >> > "priority," meaning that we can allocate outside of the memory >> pool if >> >> >> > necessary. That would still be generic even if the only use case > is >> >> the >> >> >> > consumer coordinator. We might also face a similar problem when > the >> >> >> > producer is sending requests to the transaction coordinator for >> >> KIP-98. >> >> >> > What do you think? >> >> >> > >> >> >> > Thanks, >> >> >> > Jason >> >> >> > >> >> >> > On Mon, Feb 27, 2017 at 9:09 AM, Mickael Maison < >> >> >> mickael.mai...@gmail.com> >> >> >> > wrote: >> >> >> > >> >> >> >> Apologies for the late response. >> >> >> >> >> >> >> >> Thanks Jason for the suggestion. Yes you are right, the >> Coordinator >> >> >> >> connection is "tagged" with a different id, so we could > retrieve >> it >> >> in >> >> >> >> NetworkReceive to make the distinction. >> >> >> >> However, currently the coordinator connection are made > different >> by >> >> >> using: >> >> >> >> Integer.MAX_VALUE - groupCoordinatorResponse.node().id() >> >> >> >> for the Node id. >> >> >> >> >> >> >> >> So to identify Coordinator connections, we'd have to check that >> the >> >> >> >> NetworkReceive source is a value near Integer.MAX_VALUE which > is a >> >> bit >> >> >> >> hacky ... >> >> >> >> >> >> >> >> Maybe we could add a constructor to Node that allows to pass in > a >> >> >> >> sourceId String. That way we could make all the coordinator >> >> >> >> connections explicit (by setting it to "Coordinator-[ID]" for >> >> >> >> example). >> >> >> >> What do you think ? >> >> >> >> >> >> >> >> On Tue, Jan 24, 2017 at 12:58 AM, Jason Gustafson < >> >> ja...@confluent.io> >> >> >> >> wrote: >> >> >> >> > Good point. The consumer does use a separate connection to > the >> >> >> >> coordinator, >> >> >> >> > so perhaps the connection itself could be tagged for normal > heap >> >> >> >> allocation? >> >> >> >> > >> >> >> >> > -Jason >> >> >> >> > >> >> >> >> > On Mon, Jan 23, 2017 at 10:26 AM, Onur Karaman < >> >> >> >> onurkaraman.apa...@gmail.com >> >> >> >> >> wrote: >> >> >> >> > >> >> >> >> >> I only did a quick scan but I wanted to point out what I > think >> is >> >> an >> >> >> >> >> incorrect assumption in the KIP's caveats: >> >> >> >> >> " >> >> >> >> >> There is a risk using the MemoryPool that, after we fill up > the >> >> >> memory >> >> >> >> with >> >> >> >> >> fetch data, we can starve the coordinator's connection >> >> >> >> >> ... >> >> >> >> >> To alleviate this issue, only messages larger than 1Kb will > be >> >> >> >> allocated in >> >> >> >> >> the MemoryPool. Smaller messages will be allocated directly > on >> the >> >> >> Heap >> >> >> >> >> like before. This allows group/heartbeat messages to avoid >> being >> >> >> >> delayed if >> >> >> >> >> the MemoryPool fills up. >> >> >> >> >> " >> >> >> >> >> >> >> >> >> >> So it sounds like there's an incorrect assumption that >> responses >> >> from >> >> >> >> the >> >> >> >> >> coordinator will always be small (< 1Kb as mentioned in the >> >> caveat). >> >> >> >> There >> >> >> >> >> are now a handful of request types between clients and the >> >> >> coordinator: >> >> >> >> >> {JoinGroup, SyncGroup, LeaveGroup, Heartbeat, OffsetCommit, >> >> >> OffsetFetch, >> >> >> >> >> ListGroups, DescribeGroups}. While true (at least today) for >> >> >> >> >> HeartbeatResponse and a few others, I don't think we can > assume >> >> >> >> >> JoinGroupResponse, SyncGroupResponse, > DescribeGroupsResponse, >> and >> >> >> >> >> OffsetFetchResponse will be small, as they are effectively >> >> bounded by >> >> >> >> the >> >> >> >> >> max message size allowed by the broker for the >> __consumer_offsets >> >> >> topic >> >> >> >> >> which by default is 1MB. >> >> >> >> >> >> >> >> >> >> On Mon, Jan 23, 2017 at 9:46 AM, Mickael Maison < >> >> >> >> mickael.mai...@gmail.com> >> >> >> >> >> wrote: >> >> >> >> >> >> >> >> >> >> > I've updated the KIP to address all the comments raised > here >> and >> >> >> from >> >> >> >> >> > the "DISCUSS" thread. >> >> >> >> >> > See: > https://cwiki.apache.org/confluence/display/KAFKA/KIP- >> >> >> >> >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer >> >> >> >> >> > >> >> >> >> >> > Now, I'd like to restart the vote. >> >> >> >> >> > >> >> >> >> >> > On Tue, Dec 6, 2016 at 9:02 AM, Rajini Sivaram >> >> >> >> >> > <rajinisiva...@googlemail.com> wrote: >> >> >> >> >> > > Hi Mickael, >> >> >> >> >> > > >> >> >> >> >> > > I am +1 on the overall approach of this KIP, but have a >> >> couple of >> >> >> >> >> > comments >> >> >> >> >> > > (sorry, should have brought them up on the discuss > thread >> >> >> earlier): >> >> >> >> >> > > >> >> >> >> >> > > 1. Perhaps it would be better to do this after > KAFKA-4137 >> >> >> >> >> > > <https://issues.apache.org/jira/browse/KAFKA-4137> is >> >> >> implemented? >> >> >> >> At >> >> >> >> >> > the >> >> >> >> >> > > moment, coordinator shares the same NetworkClient (and >> hence >> >> the >> >> >> >> same >> >> >> >> >> > > Selector) with consumer connections used for fetching >> records. >> >> >> Since >> >> >> >> >> > > freeing of memory relies on consuming applications > invoking >> >> >> poll() >> >> >> >> >> after >> >> >> >> >> > > processing previous records and potentially after >> committing >> >> >> >> offsets, >> >> >> >> >> it >> >> >> >> >> > > will be good to ensure that coordinator is not blocked > for >> >> read >> >> >> by >> >> >> >> >> fetch >> >> >> >> >> > > responses. This may be simpler once coordinator has its > own >> >> >> >> Selector. >> >> >> >> >> > > >> >> >> >> >> > > 2. The KIP says: *Once messages are returned to the > user, >> >> >> messages >> >> >> >> are >> >> >> >> >> > > deleted from the MemoryPool so new messages can be > stored.* >> >> >> >> >> > > Can you expand that a bit? I am assuming that partial >> buffers >> >> >> never >> >> >> >> get >> >> >> >> >> > > freed when some messages are returned to the user since > the >> >> >> >> consumer is >> >> >> >> >> > > still holding a reference to the buffer. Would buffers > be >> >> freed >> >> >> when >> >> >> >> >> > > fetches for all the partitions in a response are parsed, >> but >> >> >> perhaps >> >> >> >> >> not >> >> >> >> >> > > yet returned to the user (i.e., is the memory freed when > a >> >> >> >> reference to >> >> >> >> >> > the >> >> >> >> >> > > response buffer is no longer required)? It will be good > to >> >> >> document >> >> >> >> the >> >> >> >> >> > > (approximate) maximum memory requirement for the >> >> non-compressed >> >> >> >> case. >> >> >> >> >> > There >> >> >> >> >> > > is data read from the socket, cached in the Fetcher and > (as >> >> Radai >> >> >> >> has >> >> >> >> >> > > pointed out), the records still with the user > application. >> >> >> >> >> > > >> >> >> >> >> > > >> >> >> >> >> > > On Tue, Dec 6, 2016 at 2:04 AM, radai < >> >> >> radai.rosenbl...@gmail.com> >> >> >> >> >> > wrote: >> >> >> >> >> > > >> >> >> >> >> > >> +1 (non-binding). >> >> >> >> >> > >> >> >> >> >> >> > >> small nit pick - just because you returned a response > to >> user >> >> >> >> doesnt >> >> >> >> >> > mean >> >> >> >> >> > >> the memory id no longer used. for some cases the actual >> >> "point >> >> >> of >> >> >> >> >> > >> termination" may be the deserializer (really >> impl-dependant), >> >> >> but >> >> >> >> >> > >> generally, wouldnt it be "nice" to have an explicit >> dispose() >> >> >> call >> >> >> >> on >> >> >> >> >> > >> responses (with the addition that getting the next > batch >> of >> >> data >> >> >> >> from >> >> >> >> >> a >> >> >> >> >> > >> consumer automatically disposes the previous results) >> >> >> >> >> > >> >> >> >> >> >> > >> On Mon, Dec 5, 2016 at 6:53 AM, Edoardo Comar < >> >> >> eco...@uk.ibm.com> >> >> >> >> >> > wrote: >> >> >> >> >> > >> >> >> >> >> >> > >> > +1 (non binding) >> >> >> >> >> > >> > -------------------------------------------------- >> >> >> >> >> > >> > Edoardo Comar >> >> >> >> >> > >> > IBM MessageHub >> >> >> >> >> > >> > eco...@uk.ibm.com >> >> >> >> >> > >> > IBM UK Ltd, Hursley Park, SO21 2JN >> >> >> >> >> > >> > >> >> >> >> >> > >> > IBM United Kingdom Limited Registered in England and >> Wales >> >> >> with >> >> >> >> >> number >> >> >> >> >> > >> > 741598 Registered office: PO Box 41, North Harbour, >> >> >> Portsmouth, >> >> >> >> >> Hants. >> >> >> >> >> > >> PO6 >> >> >> >> >> > >> > 3AU >> >> >> >> >> > >> > >> >> >> >> >> > >> > >> >> >> >> >> > >> > >> >> >> >> >> > >> > From: Mickael Maison <mickael.mai...@gmail.com> >> >> >> >> >> > >> > To: dev@kafka.apache.org >> >> >> >> >> > >> > Date: 05/12/2016 14:38 >> >> >> >> >> > >> > Subject: [VOTE] KIP-81: Bound Fetch memory > usage >> in >> >> the >> >> >> >> >> > consumer >> >> >> >> >> > >> > >> >> >> >> >> > >> > >> >> >> >> >> > >> > >> >> >> >> >> > >> > Hi all, >> >> >> >> >> > >> > >> >> >> >> >> > >> > I'd like to start the vote for KIP-81: >> >> >> >> >> > >> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- >> >> >> >> >> > >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer >> >> >> >> >> > >> > >> >> >> >> >> > >> > >> >> >> >> >> > >> > Thank you >> >> >> >> >> > >> > >> >> >> >> >> > >> > >> >> >> >> >> > >> > >> >> >> >> >> > >> > >> >> >> >> >> > >> > Unless stated otherwise above: >> >> >> >> >> > >> > IBM United Kingdom Limited - Registered in England > and >> >> Wales >> >> >> with >> >> >> >> >> > number >> >> >> >> >> > >> > 741598. >> >> >> >> >> > >> > Registered office: PO Box 41, North Harbour, > Portsmouth, >> >> >> >> Hampshire >> >> >> >> >> PO6 >> >> >> >> >> > >> 3AU >> >> >> >> >> > >> > >> >> >> >> >> > >> >> >> >> >> >> > > >> >> >> >> >> > > >> >> >> >> >> > > >> >> >> >> >> > > -- >> >> >> >> >> > > Regards, >> >> >> >> >> > > >> >> >> >> >> > > Rajini >> >> >> >> >> > >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> > >> > >> > >> > -- >> > -- Guozhang >> > > > > -- > -- Guozhang > > > > Unless stated otherwise above: > IBM United Kingdom Limited - Registered in England and Wales with number > 741598. > Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU