Apologies for the late response. Thanks Jason for the suggestion. Yes you are right, the Coordinator connection is "tagged" with a different id, so we could retrieve it in NetworkReceive to make the distinction. However, currently the coordinator connection are made different by using: Integer.MAX_VALUE - groupCoordinatorResponse.node().id() for the Node id.
So to identify Coordinator connections, we'd have to check that the NetworkReceive source is a value near Integer.MAX_VALUE which is a bit hacky ... Maybe we could add a constructor to Node that allows to pass in a sourceId String. That way we could make all the coordinator connections explicit (by setting it to "Coordinator-[ID]" for example). What do you think ? On Tue, Jan 24, 2017 at 12:58 AM, Jason Gustafson <ja...@confluent.io> wrote: > Good point. The consumer does use a separate connection to the coordinator, > so perhaps the connection itself could be tagged for normal heap allocation? > > -Jason > > On Mon, Jan 23, 2017 at 10:26 AM, Onur Karaman <onurkaraman.apa...@gmail.com >> wrote: > >> I only did a quick scan but I wanted to point out what I think is an >> incorrect assumption in the KIP's caveats: >> " >> There is a risk using the MemoryPool that, after we fill up the memory with >> fetch data, we can starve the coordinator's connection >> ... >> To alleviate this issue, only messages larger than 1Kb will be allocated in >> the MemoryPool. Smaller messages will be allocated directly on the Heap >> like before. This allows group/heartbeat messages to avoid being delayed if >> the MemoryPool fills up. >> " >> >> So it sounds like there's an incorrect assumption that responses from the >> coordinator will always be small (< 1Kb as mentioned in the caveat). There >> are now a handful of request types between clients and the coordinator: >> {JoinGroup, SyncGroup, LeaveGroup, Heartbeat, OffsetCommit, OffsetFetch, >> ListGroups, DescribeGroups}. While true (at least today) for >> HeartbeatResponse and a few others, I don't think we can assume >> JoinGroupResponse, SyncGroupResponse, DescribeGroupsResponse, and >> OffsetFetchResponse will be small, as they are effectively bounded by the >> max message size allowed by the broker for the __consumer_offsets topic >> which by default is 1MB. >> >> On Mon, Jan 23, 2017 at 9:46 AM, Mickael Maison <mickael.mai...@gmail.com> >> wrote: >> >> > I've updated the KIP to address all the comments raised here and from >> > the "DISCUSS" thread. >> > See: https://cwiki.apache.org/confluence/display/KAFKA/KIP- >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer >> > >> > Now, I'd like to restart the vote. >> > >> > On Tue, Dec 6, 2016 at 9:02 AM, Rajini Sivaram >> > <rajinisiva...@googlemail.com> wrote: >> > > Hi Mickael, >> > > >> > > I am +1 on the overall approach of this KIP, but have a couple of >> > comments >> > > (sorry, should have brought them up on the discuss thread earlier): >> > > >> > > 1. Perhaps it would be better to do this after KAFKA-4137 >> > > <https://issues.apache.org/jira/browse/KAFKA-4137> is implemented? At >> > the >> > > moment, coordinator shares the same NetworkClient (and hence the same >> > > Selector) with consumer connections used for fetching records. Since >> > > freeing of memory relies on consuming applications invoking poll() >> after >> > > processing previous records and potentially after committing offsets, >> it >> > > will be good to ensure that coordinator is not blocked for read by >> fetch >> > > responses. This may be simpler once coordinator has its own Selector. >> > > >> > > 2. The KIP says: *Once messages are returned to the user, messages are >> > > deleted from the MemoryPool so new messages can be stored.* >> > > Can you expand that a bit? I am assuming that partial buffers never get >> > > freed when some messages are returned to the user since the consumer is >> > > still holding a reference to the buffer. Would buffers be freed when >> > > fetches for all the partitions in a response are parsed, but perhaps >> not >> > > yet returned to the user (i.e., is the memory freed when a reference to >> > the >> > > response buffer is no longer required)? It will be good to document the >> > > (approximate) maximum memory requirement for the non-compressed case. >> > There >> > > is data read from the socket, cached in the Fetcher and (as Radai has >> > > pointed out), the records still with the user application. >> > > >> > > >> > > On Tue, Dec 6, 2016 at 2:04 AM, radai <radai.rosenbl...@gmail.com> >> > wrote: >> > > >> > >> +1 (non-binding). >> > >> >> > >> small nit pick - just because you returned a response to user doesnt >> > mean >> > >> the memory id no longer used. for some cases the actual "point of >> > >> termination" may be the deserializer (really impl-dependant), but >> > >> generally, wouldnt it be "nice" to have an explicit dispose() call on >> > >> responses (with the addition that getting the next batch of data from >> a >> > >> consumer automatically disposes the previous results) >> > >> >> > >> On Mon, Dec 5, 2016 at 6:53 AM, Edoardo Comar <eco...@uk.ibm.com> >> > wrote: >> > >> >> > >> > +1 (non binding) >> > >> > -------------------------------------------------- >> > >> > Edoardo Comar >> > >> > IBM MessageHub >> > >> > eco...@uk.ibm.com >> > >> > IBM UK Ltd, Hursley Park, SO21 2JN >> > >> > >> > >> > IBM United Kingdom Limited Registered in England and Wales with >> number >> > >> > 741598 Registered office: PO Box 41, North Harbour, Portsmouth, >> Hants. >> > >> PO6 >> > >> > 3AU >> > >> > >> > >> > >> > >> > >> > >> > From: Mickael Maison <mickael.mai...@gmail.com> >> > >> > To: dev@kafka.apache.org >> > >> > Date: 05/12/2016 14:38 >> > >> > Subject: [VOTE] KIP-81: Bound Fetch memory usage in the >> > consumer >> > >> > >> > >> > >> > >> > >> > >> > Hi all, >> > >> > >> > >> > I'd like to start the vote for KIP-81: >> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP- >> > >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer >> > >> > >> > >> > >> > >> > Thank you >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > Unless stated otherwise above: >> > >> > IBM United Kingdom Limited - Registered in England and Wales with >> > number >> > >> > 741598. >> > >> > Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire >> PO6 >> > >> 3AU >> > >> > >> > >> >> > > >> > > >> > > >> > > -- >> > > Regards, >> > > >> > > Rajini >> > >>