1) Makes sense.
2) Makes sense. Thanks!

On Tue, Mar 28, 2017 at 10:11 AM, Mickael Maison <mickael.mai...@gmail.com>
wrote:

> Hi Guozhang,
>
> Thanks for the feedback.
>
> 1) By MemoryPool, I mean the implementation added in KIP-72. That will
> most likely be SimpleMemoryPool, but the PR for KIP-72 has not been
> merged yet.
> I've updated the KIP to make it more obvious.
>
> 2) I was thinking to pass in the priority when creating the
> Coordinator Node (in
> https://github.com/apache/kafka/blob/trunk/clients/src/
> main/java/org/apache/kafka/clients/consumer/internals/
> AbstractCoordinator.java#L582)
> Then when calling Selector.connect() (in
> https://github.com/apache/kafka/blob/trunk/clients/src/
> main/java/org/apache/kafka/clients/NetworkClient.java#L643)
> retrieve it and pass it in the Selector so it uses it when building
> the Channel.
> The idea was to avoid having to deduce the connection is for the
> Coordinator from the ID but instead have it explicitly set by
> AbstractCoordinator (and pass it all the way down to the Channel)
>
> On Tue, Mar 28, 2017 at 1:33 AM, Guozhang Wang <wangg...@gmail.com> wrote:
> > Mickael,
> >
> > Sorry for the late review of the KIP. I'm +1 on the proposed change as
> > well. Just a few minor comments on the wiki itself:
> >
> > 1. By the "MemoryPool" are you referring to a new class impl or to
> reusing "
> > org.apache.kafka.clients.producer.internals.BufferPool"? I assume it was
> > the latter case, and if yes, could you update the wiki page to make it
> > clear?
> >
> > 2. I think it is sufficient to add the priority to KafkaChannel class,
> but
> > not needed in Node (but one may need to add this parameter to Selector#
> > connect). Could you point me to which usage of Node needs to access the
> > priority?
> >
> >
> > Guozhang
> >
> >
> > On Fri, Mar 10, 2017 at 9:52 AM, Mickael Maison <
> mickael.mai...@gmail.com>
> > wrote:
> >
> >> Thanks Jason for the feedback! Yes it makes sense to always use the
> >> MemoryPool is we can. I've updated the KIP with the suggestion
> >>
> >> On Fri, Mar 10, 2017 at 1:18 AM, Jason Gustafson <ja...@confluent.io>
> >> wrote:
> >> > Just a minor comment. The KIP suggests that coordinator responses are
> >> > always allocated outside of the memory pool, but maybe we can reserve
> >> that
> >> > capability for only when the pool does not have enough space? It
> seems a
> >> > little nicer to use the pool if we can. If that seems reasonable, I'm
> +1
> >> on
> >> > the KIP. Thanks for the effort!
> >> >
> >> > -Jason
> >> >
> >> > On Tue, Feb 28, 2017 at 10:09 AM, Mickael Maison <
> >> mickael.mai...@gmail.com>
> >> > wrote:
> >> >
> >> >> Yes I agree, having a generic flag is more future proof.
> >> >> I'll update the KIP in the coming days.
> >> >>
> >> >> Thanks
> >> >>
> >> >> On Tue, Feb 28, 2017 at 5:08 AM, Jason Gustafson <ja...@confluent.io
> >
> >> >> wrote:
> >> >> > Hey Mickael,
> >> >> >
> >> >> > The suggestion to add something to Node makes sense. I could
> imagine
> >> for
> >> >> > example adding a flag to indicate that the connection has a higher
> >> >> > "priority," meaning that we can allocate outside of the memory
> pool if
> >> >> > necessary. That would still be generic even if the only use case is
> >> the
> >> >> > consumer coordinator. We might also face a similar problem when the
> >> >> > producer is sending requests to the transaction coordinator for
> >> KIP-98.
> >> >> > What do you think?
> >> >> >
> >> >> > Thanks,
> >> >> > Jason
> >> >> >
> >> >> > On Mon, Feb 27, 2017 at 9:09 AM, Mickael Maison <
> >> >> mickael.mai...@gmail.com>
> >> >> > wrote:
> >> >> >
> >> >> >> Apologies for the late response.
> >> >> >>
> >> >> >> Thanks Jason for the suggestion. Yes you are right, the
> Coordinator
> >> >> >> connection is "tagged" with a different id, so we could retrieve
> it
> >> in
> >> >> >> NetworkReceive to make the distinction.
> >> >> >> However, currently the coordinator connection are made different
> by
> >> >> using:
> >> >> >> Integer.MAX_VALUE - groupCoordinatorResponse.node().id()
> >> >> >> for the Node id.
> >> >> >>
> >> >> >> So to identify Coordinator connections, we'd have to check that
> the
> >> >> >> NetworkReceive source is a value near Integer.MAX_VALUE which is a
> >> bit
> >> >> >> hacky ...
> >> >> >>
> >> >> >> Maybe we could add a constructor to Node that allows to pass in a
> >> >> >> sourceId String. That way we could make all the coordinator
> >> >> >> connections explicit (by setting it to "Coordinator-[ID]" for
> >> >> >> example).
> >> >> >> What do you think ?
> >> >> >>
> >> >> >> On Tue, Jan 24, 2017 at 12:58 AM, Jason Gustafson <
> >> ja...@confluent.io>
> >> >> >> wrote:
> >> >> >> > Good point. The consumer does use a separate connection to the
> >> >> >> coordinator,
> >> >> >> > so perhaps the connection itself could be tagged for normal heap
> >> >> >> allocation?
> >> >> >> >
> >> >> >> > -Jason
> >> >> >> >
> >> >> >> > On Mon, Jan 23, 2017 at 10:26 AM, Onur Karaman <
> >> >> >> onurkaraman.apa...@gmail.com
> >> >> >> >> wrote:
> >> >> >> >
> >> >> >> >> I only did a quick scan but I wanted to point out what I think
> is
> >> an
> >> >> >> >> incorrect assumption in the KIP's caveats:
> >> >> >> >> "
> >> >> >> >> There is a risk using the MemoryPool that, after we fill up the
> >> >> memory
> >> >> >> with
> >> >> >> >> fetch data, we can starve the coordinator's connection
> >> >> >> >> ...
> >> >> >> >> To alleviate this issue, only messages larger than 1Kb will be
> >> >> >> allocated in
> >> >> >> >> the MemoryPool. Smaller messages will be allocated directly on
> the
> >> >> Heap
> >> >> >> >> like before. This allows group/heartbeat messages to avoid
> being
> >> >> >> delayed if
> >> >> >> >> the MemoryPool fills up.
> >> >> >> >> "
> >> >> >> >>
> >> >> >> >> So it sounds like there's an incorrect assumption that
> responses
> >> from
> >> >> >> the
> >> >> >> >> coordinator will always be small (< 1Kb as mentioned in the
> >> caveat).
> >> >> >> There
> >> >> >> >> are now a handful of request types between clients and the
> >> >> coordinator:
> >> >> >> >> {JoinGroup, SyncGroup, LeaveGroup, Heartbeat, OffsetCommit,
> >> >> OffsetFetch,
> >> >> >> >> ListGroups, DescribeGroups}. While true (at least today) for
> >> >> >> >> HeartbeatResponse and a few others, I don't think we can assume
> >> >> >> >> JoinGroupResponse, SyncGroupResponse, DescribeGroupsResponse,
> and
> >> >> >> >> OffsetFetchResponse will be small, as they are effectively
> >> bounded by
> >> >> >> the
> >> >> >> >> max message size allowed by the broker for the
> __consumer_offsets
> >> >> topic
> >> >> >> >> which by default is 1MB.
> >> >> >> >>
> >> >> >> >> On Mon, Jan 23, 2017 at 9:46 AM, Mickael Maison <
> >> >> >> mickael.mai...@gmail.com>
> >> >> >> >> wrote:
> >> >> >> >>
> >> >> >> >> > I've updated the KIP to address all the comments raised here
> and
> >> >> from
> >> >> >> >> > the "DISCUSS" thread.
> >> >> >> >> > See: https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> >> >> >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
> >> >> >> >> >
> >> >> >> >> > Now, I'd like to restart the vote.
> >> >> >> >> >
> >> >> >> >> > On Tue, Dec 6, 2016 at 9:02 AM, Rajini Sivaram
> >> >> >> >> > <rajinisiva...@googlemail.com> wrote:
> >> >> >> >> > > Hi Mickael,
> >> >> >> >> > >
> >> >> >> >> > > I am +1 on the overall approach of this KIP, but have a
> >> couple of
> >> >> >> >> > comments
> >> >> >> >> > > (sorry, should have brought them up on the discuss thread
> >> >> earlier):
> >> >> >> >> > >
> >> >> >> >> > > 1. Perhaps it would be better to do this after KAFKA-4137
> >> >> >> >> > > <https://issues.apache.org/jira/browse/KAFKA-4137> is
> >> >> implemented?
> >> >> >> At
> >> >> >> >> > the
> >> >> >> >> > > moment, coordinator shares the same NetworkClient (and
> hence
> >> the
> >> >> >> same
> >> >> >> >> > > Selector) with consumer connections used for fetching
> records.
> >> >> Since
> >> >> >> >> > > freeing of memory relies on consuming applications invoking
> >> >> poll()
> >> >> >> >> after
> >> >> >> >> > > processing previous records and potentially after
> committing
> >> >> >> offsets,
> >> >> >> >> it
> >> >> >> >> > > will be good to ensure that coordinator is not blocked for
> >> read
> >> >> by
> >> >> >> >> fetch
> >> >> >> >> > > responses. This may be simpler once coordinator has its own
> >> >> >> Selector.
> >> >> >> >> > >
> >> >> >> >> > > 2. The KIP says: *Once messages are returned to the user,
> >> >> messages
> >> >> >> are
> >> >> >> >> > > deleted from the MemoryPool so new messages can be stored.*
> >> >> >> >> > > Can you expand that a bit? I am assuming that partial
> buffers
> >> >> never
> >> >> >> get
> >> >> >> >> > > freed when some messages are returned to the user since the
> >> >> >> consumer is
> >> >> >> >> > > still holding a reference to the buffer. Would buffers be
> >> freed
> >> >> when
> >> >> >> >> > > fetches for all the partitions in a response are parsed,
> but
> >> >> perhaps
> >> >> >> >> not
> >> >> >> >> > > yet returned to the user (i.e., is the memory freed when a
> >> >> >> reference to
> >> >> >> >> > the
> >> >> >> >> > > response buffer is no longer required)? It will be good to
> >> >> document
> >> >> >> the
> >> >> >> >> > > (approximate) maximum memory requirement for the
> >> non-compressed
> >> >> >> case.
> >> >> >> >> > There
> >> >> >> >> > > is data read from the socket, cached in the Fetcher and (as
> >> Radai
> >> >> >> has
> >> >> >> >> > > pointed out), the records still with the user application.
> >> >> >> >> > >
> >> >> >> >> > >
> >> >> >> >> > > On Tue, Dec 6, 2016 at 2:04 AM, radai <
> >> >> radai.rosenbl...@gmail.com>
> >> >> >> >> > wrote:
> >> >> >> >> > >
> >> >> >> >> > >> +1 (non-binding).
> >> >> >> >> > >>
> >> >> >> >> > >> small nit pick - just because you returned a response to
> user
> >> >> >> doesnt
> >> >> >> >> > mean
> >> >> >> >> > >> the memory id no longer used. for some cases the actual
> >> "point
> >> >> of
> >> >> >> >> > >> termination" may be the deserializer (really
> impl-dependant),
> >> >> but
> >> >> >> >> > >> generally, wouldnt it be "nice" to have an explicit
> dispose()
> >> >> call
> >> >> >> on
> >> >> >> >> > >> responses (with the addition that getting the next batch
> of
> >> data
> >> >> >> from
> >> >> >> >> a
> >> >> >> >> > >> consumer automatically disposes the previous results)
> >> >> >> >> > >>
> >> >> >> >> > >> On Mon, Dec 5, 2016 at 6:53 AM, Edoardo Comar <
> >> >> eco...@uk.ibm.com>
> >> >> >> >> > wrote:
> >> >> >> >> > >>
> >> >> >> >> > >> > +1 (non binding)
> >> >> >> >> > >> > --------------------------------------------------
> >> >> >> >> > >> > Edoardo Comar
> >> >> >> >> > >> > IBM MessageHub
> >> >> >> >> > >> > eco...@uk.ibm.com
> >> >> >> >> > >> > IBM UK Ltd, Hursley Park, SO21 2JN
> >> >> >> >> > >> >
> >> >> >> >> > >> > IBM United Kingdom Limited Registered in England and
> Wales
> >> >> with
> >> >> >> >> number
> >> >> >> >> > >> > 741598 Registered office: PO Box 41, North Harbour,
> >> >> Portsmouth,
> >> >> >> >> Hants.
> >> >> >> >> > >> PO6
> >> >> >> >> > >> > 3AU
> >> >> >> >> > >> >
> >> >> >> >> > >> >
> >> >> >> >> > >> >
> >> >> >> >> > >> > From:   Mickael Maison <mickael.mai...@gmail.com>
> >> >> >> >> > >> > To:     dev@kafka.apache.org
> >> >> >> >> > >> > Date:   05/12/2016 14:38
> >> >> >> >> > >> > Subject:        [VOTE] KIP-81: Bound Fetch memory usage
> in
> >> the
> >> >> >> >> > consumer
> >> >> >> >> > >> >
> >> >> >> >> > >> >
> >> >> >> >> > >> >
> >> >> >> >> > >> > Hi all,
> >> >> >> >> > >> >
> >> >> >> >> > >> > I'd like to start the vote for KIP-81:
> >> >> >> >> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> >> >> >> > >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
> >> >> >> >> > >> >
> >> >> >> >> > >> >
> >> >> >> >> > >> > Thank you
> >> >> >> >> > >> >
> >> >> >> >> > >> >
> >> >> >> >> > >> >
> >> >> >> >> > >> >
> >> >> >> >> > >> > Unless stated otherwise above:
> >> >> >> >> > >> > IBM United Kingdom Limited - Registered in England and
> >> Wales
> >> >> with
> >> >> >> >> > number
> >> >> >> >> > >> > 741598.
> >> >> >> >> > >> > Registered office: PO Box 41, North Harbour, Portsmouth,
> >> >> >> Hampshire
> >> >> >> >> PO6
> >> >> >> >> > >> 3AU
> >> >> >> >> > >> >
> >> >> >> >> > >>
> >> >> >> >> > >
> >> >> >> >> > >
> >> >> >> >> > >
> >> >> >> >> > > --
> >> >> >> >> > > Regards,
> >> >> >> >> > >
> >> >> >> >> > > Rajini
> >> >> >> >> >
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> >
> >
> >
> > --
> > -- Guozhang
>



-- 
-- Guozhang

Reply via email to