Apologies for the late response.

Thanks Jason for the suggestion. Yes you are right, the Coordinator
connection is "tagged" with a different id, so we could retrieve it in
NetworkReceive to make the distinction.
However, currently the coordinator connection are made different by using:
Integer.MAX_VALUE - groupCoordinatorResponse.node().id()
for the Node id.

So to identify Coordinator connections, we'd have to check that the
NetworkReceive source is a value near Integer.MAX_VALUE which is a bit
hacky ...

Maybe we could add a constructor to Node that allows to pass in a
sourceId String. That way we could make all the coordinator
connections explicit (by setting it to "Coordinator-[ID]" for
example).
What do you think ?

On Tue, Jan 24, 2017 at 12:58 AM, Jason Gustafson <ja...@confluent.io> wrote:
> Good point. The consumer does use a separate connection to the coordinator,
> so perhaps the connection itself could be tagged for normal heap allocation?
>
> -Jason
>
> On Mon, Jan 23, 2017 at 10:26 AM, Onur Karaman <onurkaraman.apa...@gmail.com
>> wrote:
>
>> I only did a quick scan but I wanted to point out what I think is an
>> incorrect assumption in the KIP's caveats:
>> "
>> There is a risk using the MemoryPool that, after we fill up the memory with
>> fetch data, we can starve the coordinator's connection
>> ...
>> To alleviate this issue, only messages larger than 1Kb will be allocated in
>> the MemoryPool. Smaller messages will be allocated directly on the Heap
>> like before. This allows group/heartbeat messages to avoid being delayed if
>> the MemoryPool fills up.
>> "
>>
>> So it sounds like there's an incorrect assumption that responses from the
>> coordinator will always be small (< 1Kb as mentioned in the caveat). There
>> are now a handful of request types between clients and the coordinator:
>> {JoinGroup, SyncGroup, LeaveGroup, Heartbeat, OffsetCommit, OffsetFetch,
>> ListGroups, DescribeGroups}. While true (at least today) for
>> HeartbeatResponse and a few others, I don't think we can assume
>> JoinGroupResponse, SyncGroupResponse, DescribeGroupsResponse, and
>> OffsetFetchResponse will be small, as they are effectively bounded by the
>> max message size allowed by the broker for the __consumer_offsets topic
>> which by default is 1MB.
>>
>> On Mon, Jan 23, 2017 at 9:46 AM, Mickael Maison <mickael.mai...@gmail.com>
>> wrote:
>>
>> > I've updated the KIP to address all the comments raised here and from
>> > the "DISCUSS" thread.
>> > See: https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
>> >
>> > Now, I'd like to restart the vote.
>> >
>> > On Tue, Dec 6, 2016 at 9:02 AM, Rajini Sivaram
>> > <rajinisiva...@googlemail.com> wrote:
>> > > Hi Mickael,
>> > >
>> > > I am +1 on the overall approach of this KIP, but have a couple of
>> > comments
>> > > (sorry, should have brought them up on the discuss thread earlier):
>> > >
>> > > 1. Perhaps it would be better to do this after KAFKA-4137
>> > > <https://issues.apache.org/jira/browse/KAFKA-4137> is implemented? At
>> > the
>> > > moment, coordinator shares the same NetworkClient (and hence the same
>> > > Selector) with consumer connections used for fetching records. Since
>> > > freeing of memory relies on consuming applications invoking poll()
>> after
>> > > processing previous records and potentially after committing offsets,
>> it
>> > > will be good to ensure that coordinator is not blocked for read by
>> fetch
>> > > responses. This may be simpler once coordinator has its own Selector.
>> > >
>> > > 2. The KIP says: *Once messages are returned to the user, messages are
>> > > deleted from the MemoryPool so new messages can be stored.*
>> > > Can you expand that a bit? I am assuming that partial buffers never get
>> > > freed when some messages are returned to the user since the consumer is
>> > > still holding a reference to the buffer. Would buffers be freed when
>> > > fetches for all the partitions in a response are parsed, but perhaps
>> not
>> > > yet returned to the user (i.e., is the memory freed when a reference to
>> > the
>> > > response buffer is no longer required)? It will be good to document the
>> > > (approximate) maximum memory requirement for the non-compressed case.
>> > There
>> > > is data read from the socket, cached in the Fetcher and (as Radai has
>> > > pointed out), the records still with the user application.
>> > >
>> > >
>> > > On Tue, Dec 6, 2016 at 2:04 AM, radai <radai.rosenbl...@gmail.com>
>> > wrote:
>> > >
>> > >> +1 (non-binding).
>> > >>
>> > >> small nit pick - just because you returned a response to user doesnt
>> > mean
>> > >> the memory id no longer used. for some cases the actual "point of
>> > >> termination" may be the deserializer (really impl-dependant), but
>> > >> generally, wouldnt it be "nice" to have an explicit dispose() call on
>> > >> responses (with the addition that getting the next batch of data from
>> a
>> > >> consumer automatically disposes the previous results)
>> > >>
>> > >> On Mon, Dec 5, 2016 at 6:53 AM, Edoardo Comar <eco...@uk.ibm.com>
>> > wrote:
>> > >>
>> > >> > +1 (non binding)
>> > >> > --------------------------------------------------
>> > >> > Edoardo Comar
>> > >> > IBM MessageHub
>> > >> > eco...@uk.ibm.com
>> > >> > IBM UK Ltd, Hursley Park, SO21 2JN
>> > >> >
>> > >> > IBM United Kingdom Limited Registered in England and Wales with
>> number
>> > >> > 741598 Registered office: PO Box 41, North Harbour, Portsmouth,
>> Hants.
>> > >> PO6
>> > >> > 3AU
>> > >> >
>> > >> >
>> > >> >
>> > >> > From:   Mickael Maison <mickael.mai...@gmail.com>
>> > >> > To:     dev@kafka.apache.org
>> > >> > Date:   05/12/2016 14:38
>> > >> > Subject:        [VOTE] KIP-81: Bound Fetch memory usage in the
>> > consumer
>> > >> >
>> > >> >
>> > >> >
>> > >> > Hi all,
>> > >> >
>> > >> > I'd like to start the vote for KIP-81:
>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
>> > >> >
>> > >> >
>> > >> > Thank you
>> > >> >
>> > >> >
>> > >> >
>> > >> >
>> > >> > Unless stated otherwise above:
>> > >> > IBM United Kingdom Limited - Registered in England and Wales with
>> > number
>> > >> > 741598.
>> > >> > Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire
>> PO6
>> > >> 3AU
>> > >> >
>> > >>
>> > >
>> > >
>> > >
>> > > --
>> > > Regards,
>> > >
>> > > Rajini
>> >
>>

Reply via email to