Hi, Mickael,

Thanks for the KIP. +1 from me too.

Jun

On Thu, Mar 30, 2017 at 4:40 AM, Mickael Maison <mickael.mai...@gmail.com>
wrote:

> Thanks for the suggestion.
>
> Currently, I can't think of a scenario when we would need multiple
> priority "levels". If in the future it makes sense to have some, I
> think we could just make the change without a new KIP as these APIs
> are not public.
> So I'd be more inclined to keep the boolean for now.
>
> On Wed, Mar 29, 2017 at 6:13 PM, Edoardo Comar <eco...@uk.ibm.com> wrote:
> > Hi Mickael,
> > as discussed we could change the priority parameter to be an int rather
> > than a boolean.
> >
> > That's a bit more extensible
> > --------------------------------------------------
> > Edoardo Comar
> > IBM MessageHub
> > eco...@uk.ibm.com
> > IBM UK Ltd, Hursley Park, SO21 2JN
> >
> > IBM United Kingdom Limited Registered in England and Wales with number
> > 741598 Registered office: PO Box 41, North Harbour, Portsmouth, Hants.
> PO6
> > 3AU
> >
> >
> >
> > From:   Guozhang Wang <wangg...@gmail.com>
> > To:     "dev@kafka.apache.org" <dev@kafka.apache.org>
> > Date:   28/03/2017 19:02
> > Subject:        Re: [VOTE] KIP-81: Bound Fetch memory usage in the
> > consumer
> >
> >
> >
> > 1) Makes sense.
> > 2) Makes sense. Thanks!
> >
> > On Tue, Mar 28, 2017 at 10:11 AM, Mickael Maison
> > <mickael.mai...@gmail.com>
> > wrote:
> >
> >> Hi Guozhang,
> >>
> >> Thanks for the feedback.
> >>
> >> 1) By MemoryPool, I mean the implementation added in KIP-72. That will
> >> most likely be SimpleMemoryPool, but the PR for KIP-72 has not been
> >> merged yet.
> >> I've updated the KIP to make it more obvious.
> >>
> >> 2) I was thinking to pass in the priority when creating the
> >> Coordinator Node (in
> >> https://github.com/apache/kafka/blob/trunk/clients/src/
> >> main/java/org/apache/kafka/clients/consumer/internals/
> >> AbstractCoordinator.java#L582)
> >> Then when calling Selector.connect() (in
> >> https://github.com/apache/kafka/blob/trunk/clients/src/
> >> main/java/org/apache/kafka/clients/NetworkClient.java#L643)
> >> retrieve it and pass it in the Selector so it uses it when building
> >> the Channel.
> >> The idea was to avoid having to deduce the connection is for the
> >> Coordinator from the ID but instead have it explicitly set by
> >> AbstractCoordinator (and pass it all the way down to the Channel)
> >>
> >> On Tue, Mar 28, 2017 at 1:33 AM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> >> > Mickael,
> >> >
> >> > Sorry for the late review of the KIP. I'm +1 on the proposed change as
> >> > well. Just a few minor comments on the wiki itself:
> >> >
> >> > 1. By the "MemoryPool" are you referring to a new class impl or to
> >> reusing "
> >> > org.apache.kafka.clients.producer.internals.BufferPool"? I assume it
> > was
> >> > the latter case, and if yes, could you update the wiki page to make it
> >> > clear?
> >> >
> >> > 2. I think it is sufficient to add the priority to KafkaChannel class,
> >> but
> >> > not needed in Node (but one may need to add this parameter to
> > Selector#
> >> > connect). Could you point me to which usage of Node needs to access
> > the
> >> > priority?
> >> >
> >> >
> >> > Guozhang
> >> >
> >> >
> >> > On Fri, Mar 10, 2017 at 9:52 AM, Mickael Maison <
> >> mickael.mai...@gmail.com>
> >> > wrote:
> >> >
> >> >> Thanks Jason for the feedback! Yes it makes sense to always use the
> >> >> MemoryPool is we can. I've updated the KIP with the suggestion
> >> >>
> >> >> On Fri, Mar 10, 2017 at 1:18 AM, Jason Gustafson <ja...@confluent.io
> >
> >> >> wrote:
> >> >> > Just a minor comment. The KIP suggests that coordinator responses
> > are
> >> >> > always allocated outside of the memory pool, but maybe we can
> > reserve
> >> >> that
> >> >> > capability for only when the pool does not have enough space? It
> >> seems a
> >> >> > little nicer to use the pool if we can. If that seems reasonable,
> > I'm
> >> +1
> >> >> on
> >> >> > the KIP. Thanks for the effort!
> >> >> >
> >> >> > -Jason
> >> >> >
> >> >> > On Tue, Feb 28, 2017 at 10:09 AM, Mickael Maison <
> >> >> mickael.mai...@gmail.com>
> >> >> > wrote:
> >> >> >
> >> >> >> Yes I agree, having a generic flag is more future proof.
> >> >> >> I'll update the KIP in the coming days.
> >> >> >>
> >> >> >> Thanks
> >> >> >>
> >> >> >> On Tue, Feb 28, 2017 at 5:08 AM, Jason Gustafson
> > <ja...@confluent.io
> >> >
> >> >> >> wrote:
> >> >> >> > Hey Mickael,
> >> >> >> >
> >> >> >> > The suggestion to add something to Node makes sense. I could
> >> imagine
> >> >> for
> >> >> >> > example adding a flag to indicate that the connection has a
> > higher
> >> >> >> > "priority," meaning that we can allocate outside of the memory
> >> pool if
> >> >> >> > necessary. That would still be generic even if the only use case
> > is
> >> >> the
> >> >> >> > consumer coordinator. We might also face a similar problem when
> > the
> >> >> >> > producer is sending requests to the transaction coordinator for
> >> >> KIP-98.
> >> >> >> > What do you think?
> >> >> >> >
> >> >> >> > Thanks,
> >> >> >> > Jason
> >> >> >> >
> >> >> >> > On Mon, Feb 27, 2017 at 9:09 AM, Mickael Maison <
> >> >> >> mickael.mai...@gmail.com>
> >> >> >> > wrote:
> >> >> >> >
> >> >> >> >> Apologies for the late response.
> >> >> >> >>
> >> >> >> >> Thanks Jason for the suggestion. Yes you are right, the
> >> Coordinator
> >> >> >> >> connection is "tagged" with a different id, so we could
> > retrieve
> >> it
> >> >> in
> >> >> >> >> NetworkReceive to make the distinction.
> >> >> >> >> However, currently the coordinator connection are made
> > different
> >> by
> >> >> >> using:
> >> >> >> >> Integer.MAX_VALUE - groupCoordinatorResponse.node().id()
> >> >> >> >> for the Node id.
> >> >> >> >>
> >> >> >> >> So to identify Coordinator connections, we'd have to check that
> >> the
> >> >> >> >> NetworkReceive source is a value near Integer.MAX_VALUE which
> > is a
> >> >> bit
> >> >> >> >> hacky ...
> >> >> >> >>
> >> >> >> >> Maybe we could add a constructor to Node that allows to pass in
> > a
> >> >> >> >> sourceId String. That way we could make all the coordinator
> >> >> >> >> connections explicit (by setting it to "Coordinator-[ID]" for
> >> >> >> >> example).
> >> >> >> >> What do you think ?
> >> >> >> >>
> >> >> >> >> On Tue, Jan 24, 2017 at 12:58 AM, Jason Gustafson <
> >> >> ja...@confluent.io>
> >> >> >> >> wrote:
> >> >> >> >> > Good point. The consumer does use a separate connection to
> > the
> >> >> >> >> coordinator,
> >> >> >> >> > so perhaps the connection itself could be tagged for normal
> > heap
> >> >> >> >> allocation?
> >> >> >> >> >
> >> >> >> >> > -Jason
> >> >> >> >> >
> >> >> >> >> > On Mon, Jan 23, 2017 at 10:26 AM, Onur Karaman <
> >> >> >> >> onurkaraman.apa...@gmail.com
> >> >> >> >> >> wrote:
> >> >> >> >> >
> >> >> >> >> >> I only did a quick scan but I wanted to point out what I
> > think
> >> is
> >> >> an
> >> >> >> >> >> incorrect assumption in the KIP's caveats:
> >> >> >> >> >> "
> >> >> >> >> >> There is a risk using the MemoryPool that, after we fill up
> > the
> >> >> >> memory
> >> >> >> >> with
> >> >> >> >> >> fetch data, we can starve the coordinator's connection
> >> >> >> >> >> ...
> >> >> >> >> >> To alleviate this issue, only messages larger than 1Kb will
> > be
> >> >> >> >> allocated in
> >> >> >> >> >> the MemoryPool. Smaller messages will be allocated directly
> > on
> >> the
> >> >> >> Heap
> >> >> >> >> >> like before. This allows group/heartbeat messages to avoid
> >> being
> >> >> >> >> delayed if
> >> >> >> >> >> the MemoryPool fills up.
> >> >> >> >> >> "
> >> >> >> >> >>
> >> >> >> >> >> So it sounds like there's an incorrect assumption that
> >> responses
> >> >> from
> >> >> >> >> the
> >> >> >> >> >> coordinator will always be small (< 1Kb as mentioned in the
> >> >> caveat).
> >> >> >> >> There
> >> >> >> >> >> are now a handful of request types between clients and the
> >> >> >> coordinator:
> >> >> >> >> >> {JoinGroup, SyncGroup, LeaveGroup, Heartbeat, OffsetCommit,
> >> >> >> OffsetFetch,
> >> >> >> >> >> ListGroups, DescribeGroups}. While true (at least today) for
> >> >> >> >> >> HeartbeatResponse and a few others, I don't think we can
> > assume
> >> >> >> >> >> JoinGroupResponse, SyncGroupResponse,
> > DescribeGroupsResponse,
> >> and
> >> >> >> >> >> OffsetFetchResponse will be small, as they are effectively
> >> >> bounded by
> >> >> >> >> the
> >> >> >> >> >> max message size allowed by the broker for the
> >> __consumer_offsets
> >> >> >> topic
> >> >> >> >> >> which by default is 1MB.
> >> >> >> >> >>
> >> >> >> >> >> On Mon, Jan 23, 2017 at 9:46 AM, Mickael Maison <
> >> >> >> >> mickael.mai...@gmail.com>
> >> >> >> >> >> wrote:
> >> >> >> >> >>
> >> >> >> >> >> > I've updated the KIP to address all the comments raised
> > here
> >> and
> >> >> >> from
> >> >> >> >> >> > the "DISCUSS" thread.
> >> >> >> >> >> > See:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> >> >> >> >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
> >> >> >> >> >> >
> >> >> >> >> >> > Now, I'd like to restart the vote.
> >> >> >> >> >> >
> >> >> >> >> >> > On Tue, Dec 6, 2016 at 9:02 AM, Rajini Sivaram
> >> >> >> >> >> > <rajinisiva...@googlemail.com> wrote:
> >> >> >> >> >> > > Hi Mickael,
> >> >> >> >> >> > >
> >> >> >> >> >> > > I am +1 on the overall approach of this KIP, but have a
> >> >> couple of
> >> >> >> >> >> > comments
> >> >> >> >> >> > > (sorry, should have brought them up on the discuss
> > thread
> >> >> >> earlier):
> >> >> >> >> >> > >
> >> >> >> >> >> > > 1. Perhaps it would be better to do this after
> > KAFKA-4137
> >> >> >> >> >> > > <https://issues.apache.org/jira/browse/KAFKA-4137> is
> >> >> >> implemented?
> >> >> >> >> At
> >> >> >> >> >> > the
> >> >> >> >> >> > > moment, coordinator shares the same NetworkClient (and
> >> hence
> >> >> the
> >> >> >> >> same
> >> >> >> >> >> > > Selector) with consumer connections used for fetching
> >> records.
> >> >> >> Since
> >> >> >> >> >> > > freeing of memory relies on consuming applications
> > invoking
> >> >> >> poll()
> >> >> >> >> >> after
> >> >> >> >> >> > > processing previous records and potentially after
> >> committing
> >> >> >> >> offsets,
> >> >> >> >> >> it
> >> >> >> >> >> > > will be good to ensure that coordinator is not blocked
> > for
> >> >> read
> >> >> >> by
> >> >> >> >> >> fetch
> >> >> >> >> >> > > responses. This may be simpler once coordinator has its
> > own
> >> >> >> >> Selector.
> >> >> >> >> >> > >
> >> >> >> >> >> > > 2. The KIP says: *Once messages are returned to the
> > user,
> >> >> >> messages
> >> >> >> >> are
> >> >> >> >> >> > > deleted from the MemoryPool so new messages can be
> > stored.*
> >> >> >> >> >> > > Can you expand that a bit? I am assuming that partial
> >> buffers
> >> >> >> never
> >> >> >> >> get
> >> >> >> >> >> > > freed when some messages are returned to the user since
> > the
> >> >> >> >> consumer is
> >> >> >> >> >> > > still holding a reference to the buffer. Would buffers
> > be
> >> >> freed
> >> >> >> when
> >> >> >> >> >> > > fetches for all the partitions in a response are parsed,
> >> but
> >> >> >> perhaps
> >> >> >> >> >> not
> >> >> >> >> >> > > yet returned to the user (i.e., is the memory freed when
> > a
> >> >> >> >> reference to
> >> >> >> >> >> > the
> >> >> >> >> >> > > response buffer is no longer required)? It will be good
> > to
> >> >> >> document
> >> >> >> >> the
> >> >> >> >> >> > > (approximate) maximum memory requirement for the
> >> >> non-compressed
> >> >> >> >> case.
> >> >> >> >> >> > There
> >> >> >> >> >> > > is data read from the socket, cached in the Fetcher and
> > (as
> >> >> Radai
> >> >> >> >> has
> >> >> >> >> >> > > pointed out), the records still with the user
> > application.
> >> >> >> >> >> > >
> >> >> >> >> >> > >
> >> >> >> >> >> > > On Tue, Dec 6, 2016 at 2:04 AM, radai <
> >> >> >> radai.rosenbl...@gmail.com>
> >> >> >> >> >> > wrote:
> >> >> >> >> >> > >
> >> >> >> >> >> > >> +1 (non-binding).
> >> >> >> >> >> > >>
> >> >> >> >> >> > >> small nit pick - just because you returned a response
> > to
> >> user
> >> >> >> >> doesnt
> >> >> >> >> >> > mean
> >> >> >> >> >> > >> the memory id no longer used. for some cases the actual
> >> >> "point
> >> >> >> of
> >> >> >> >> >> > >> termination" may be the deserializer (really
> >> impl-dependant),
> >> >> >> but
> >> >> >> >> >> > >> generally, wouldnt it be "nice" to have an explicit
> >> dispose()
> >> >> >> call
> >> >> >> >> on
> >> >> >> >> >> > >> responses (with the addition that getting the next
> > batch
> >> of
> >> >> data
> >> >> >> >> from
> >> >> >> >> >> a
> >> >> >> >> >> > >> consumer automatically disposes the previous results)
> >> >> >> >> >> > >>
> >> >> >> >> >> > >> On Mon, Dec 5, 2016 at 6:53 AM, Edoardo Comar <
> >> >> >> eco...@uk.ibm.com>
> >> >> >> >> >> > wrote:
> >> >> >> >> >> > >>
> >> >> >> >> >> > >> > +1 (non binding)
> >> >> >> >> >> > >> > --------------------------------------------------
> >> >> >> >> >> > >> > Edoardo Comar
> >> >> >> >> >> > >> > IBM MessageHub
> >> >> >> >> >> > >> > eco...@uk.ibm.com
> >> >> >> >> >> > >> > IBM UK Ltd, Hursley Park, SO21 2JN
> >> >> >> >> >> > >> >
> >> >> >> >> >> > >> > IBM United Kingdom Limited Registered in England and
> >> Wales
> >> >> >> with
> >> >> >> >> >> number
> >> >> >> >> >> > >> > 741598 Registered office: PO Box 41, North Harbour,
> >> >> >> Portsmouth,
> >> >> >> >> >> Hants.
> >> >> >> >> >> > >> PO6
> >> >> >> >> >> > >> > 3AU
> >> >> >> >> >> > >> >
> >> >> >> >> >> > >> >
> >> >> >> >> >> > >> >
> >> >> >> >> >> > >> > From:   Mickael Maison <mickael.mai...@gmail.com>
> >> >> >> >> >> > >> > To:     dev@kafka.apache.org
> >> >> >> >> >> > >> > Date:   05/12/2016 14:38
> >> >> >> >> >> > >> > Subject:        [VOTE] KIP-81: Bound Fetch memory
> > usage
> >> in
> >> >> the
> >> >> >> >> >> > consumer
> >> >> >> >> >> > >> >
> >> >> >> >> >> > >> >
> >> >> >> >> >> > >> >
> >> >> >> >> >> > >> > Hi all,
> >> >> >> >> >> > >> >
> >> >> >> >> >> > >> > I'd like to start the vote for KIP-81:
> >> >> >> >> >> > >> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> >> >> >> >> > >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
> >> >> >> >> >> > >> >
> >> >> >> >> >> > >> >
> >> >> >> >> >> > >> > Thank you
> >> >> >> >> >> > >> >
> >> >> >> >> >> > >> >
> >> >> >> >> >> > >> >
> >> >> >> >> >> > >> >
> >> >> >> >> >> > >> > Unless stated otherwise above:
> >> >> >> >> >> > >> > IBM United Kingdom Limited - Registered in England
> > and
> >> >> Wales
> >> >> >> with
> >> >> >> >> >> > number
> >> >> >> >> >> > >> > 741598.
> >> >> >> >> >> > >> > Registered office: PO Box 41, North Harbour,
> > Portsmouth,
> >> >> >> >> Hampshire
> >> >> >> >> >> PO6
> >> >> >> >> >> > >> 3AU
> >> >> >> >> >> > >> >
> >> >> >> >> >> > >>
> >> >> >> >> >> > >
> >> >> >> >> >> > >
> >> >> >> >> >> > >
> >> >> >> >> >> > > --
> >> >> >> >> >> > > Regards,
> >> >> >> >> >> > >
> >> >> >> >> >> > > Rajini
> >> >> >> >> >> >
> >> >> >> >> >>
> >> >> >> >>
> >> >> >>
> >> >>
> >> >
> >> >
> >> >
> >> > --
> >> > -- Guozhang
> >>
> >
> >
> >
> > --
> > -- Guozhang
> >
> >
> >
> > Unless stated otherwise above:
> > IBM United Kingdom Limited - Registered in England and Wales with number
> > 741598.
> > Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6
> 3AU
>

Reply via email to