Thanks for the suggestion.

Currently, I can't think of a scenario when we would need multiple
priority "levels". If in the future it makes sense to have some, I
think we could just make the change without a new KIP as these APIs
are not public.
So I'd be more inclined to keep the boolean for now.

On Wed, Mar 29, 2017 at 6:13 PM, Edoardo Comar <eco...@uk.ibm.com> wrote:
> Hi Mickael,
> as discussed we could change the priority parameter to be an int rather
> than a boolean.
>
> That's a bit more extensible
> --------------------------------------------------
> Edoardo Comar
> IBM MessageHub
> eco...@uk.ibm.com
> IBM UK Ltd, Hursley Park, SO21 2JN
>
> IBM United Kingdom Limited Registered in England and Wales with number
> 741598 Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6
> 3AU
>
>
>
> From:   Guozhang Wang <wangg...@gmail.com>
> To:     "dev@kafka.apache.org" <dev@kafka.apache.org>
> Date:   28/03/2017 19:02
> Subject:        Re: [VOTE] KIP-81: Bound Fetch memory usage in the
> consumer
>
>
>
> 1) Makes sense.
> 2) Makes sense. Thanks!
>
> On Tue, Mar 28, 2017 at 10:11 AM, Mickael Maison
> <mickael.mai...@gmail.com>
> wrote:
>
>> Hi Guozhang,
>>
>> Thanks for the feedback.
>>
>> 1) By MemoryPool, I mean the implementation added in KIP-72. That will
>> most likely be SimpleMemoryPool, but the PR for KIP-72 has not been
>> merged yet.
>> I've updated the KIP to make it more obvious.
>>
>> 2) I was thinking to pass in the priority when creating the
>> Coordinator Node (in
>> https://github.com/apache/kafka/blob/trunk/clients/src/
>> main/java/org/apache/kafka/clients/consumer/internals/
>> AbstractCoordinator.java#L582)
>> Then when calling Selector.connect() (in
>> https://github.com/apache/kafka/blob/trunk/clients/src/
>> main/java/org/apache/kafka/clients/NetworkClient.java#L643)
>> retrieve it and pass it in the Selector so it uses it when building
>> the Channel.
>> The idea was to avoid having to deduce the connection is for the
>> Coordinator from the ID but instead have it explicitly set by
>> AbstractCoordinator (and pass it all the way down to the Channel)
>>
>> On Tue, Mar 28, 2017 at 1:33 AM, Guozhang Wang <wangg...@gmail.com>
> wrote:
>> > Mickael,
>> >
>> > Sorry for the late review of the KIP. I'm +1 on the proposed change as
>> > well. Just a few minor comments on the wiki itself:
>> >
>> > 1. By the "MemoryPool" are you referring to a new class impl or to
>> reusing "
>> > org.apache.kafka.clients.producer.internals.BufferPool"? I assume it
> was
>> > the latter case, and if yes, could you update the wiki page to make it
>> > clear?
>> >
>> > 2. I think it is sufficient to add the priority to KafkaChannel class,
>> but
>> > not needed in Node (but one may need to add this parameter to
> Selector#
>> > connect). Could you point me to which usage of Node needs to access
> the
>> > priority?
>> >
>> >
>> > Guozhang
>> >
>> >
>> > On Fri, Mar 10, 2017 at 9:52 AM, Mickael Maison <
>> mickael.mai...@gmail.com>
>> > wrote:
>> >
>> >> Thanks Jason for the feedback! Yes it makes sense to always use the
>> >> MemoryPool is we can. I've updated the KIP with the suggestion
>> >>
>> >> On Fri, Mar 10, 2017 at 1:18 AM, Jason Gustafson <ja...@confluent.io>
>> >> wrote:
>> >> > Just a minor comment. The KIP suggests that coordinator responses
> are
>> >> > always allocated outside of the memory pool, but maybe we can
> reserve
>> >> that
>> >> > capability for only when the pool does not have enough space? It
>> seems a
>> >> > little nicer to use the pool if we can. If that seems reasonable,
> I'm
>> +1
>> >> on
>> >> > the KIP. Thanks for the effort!
>> >> >
>> >> > -Jason
>> >> >
>> >> > On Tue, Feb 28, 2017 at 10:09 AM, Mickael Maison <
>> >> mickael.mai...@gmail.com>
>> >> > wrote:
>> >> >
>> >> >> Yes I agree, having a generic flag is more future proof.
>> >> >> I'll update the KIP in the coming days.
>> >> >>
>> >> >> Thanks
>> >> >>
>> >> >> On Tue, Feb 28, 2017 at 5:08 AM, Jason Gustafson
> <ja...@confluent.io
>> >
>> >> >> wrote:
>> >> >> > Hey Mickael,
>> >> >> >
>> >> >> > The suggestion to add something to Node makes sense. I could
>> imagine
>> >> for
>> >> >> > example adding a flag to indicate that the connection has a
> higher
>> >> >> > "priority," meaning that we can allocate outside of the memory
>> pool if
>> >> >> > necessary. That would still be generic even if the only use case
> is
>> >> the
>> >> >> > consumer coordinator. We might also face a similar problem when
> the
>> >> >> > producer is sending requests to the transaction coordinator for
>> >> KIP-98.
>> >> >> > What do you think?
>> >> >> >
>> >> >> > Thanks,
>> >> >> > Jason
>> >> >> >
>> >> >> > On Mon, Feb 27, 2017 at 9:09 AM, Mickael Maison <
>> >> >> mickael.mai...@gmail.com>
>> >> >> > wrote:
>> >> >> >
>> >> >> >> Apologies for the late response.
>> >> >> >>
>> >> >> >> Thanks Jason for the suggestion. Yes you are right, the
>> Coordinator
>> >> >> >> connection is "tagged" with a different id, so we could
> retrieve
>> it
>> >> in
>> >> >> >> NetworkReceive to make the distinction.
>> >> >> >> However, currently the coordinator connection are made
> different
>> by
>> >> >> using:
>> >> >> >> Integer.MAX_VALUE - groupCoordinatorResponse.node().id()
>> >> >> >> for the Node id.
>> >> >> >>
>> >> >> >> So to identify Coordinator connections, we'd have to check that
>> the
>> >> >> >> NetworkReceive source is a value near Integer.MAX_VALUE which
> is a
>> >> bit
>> >> >> >> hacky ...
>> >> >> >>
>> >> >> >> Maybe we could add a constructor to Node that allows to pass in
> a
>> >> >> >> sourceId String. That way we could make all the coordinator
>> >> >> >> connections explicit (by setting it to "Coordinator-[ID]" for
>> >> >> >> example).
>> >> >> >> What do you think ?
>> >> >> >>
>> >> >> >> On Tue, Jan 24, 2017 at 12:58 AM, Jason Gustafson <
>> >> ja...@confluent.io>
>> >> >> >> wrote:
>> >> >> >> > Good point. The consumer does use a separate connection to
> the
>> >> >> >> coordinator,
>> >> >> >> > so perhaps the connection itself could be tagged for normal
> heap
>> >> >> >> allocation?
>> >> >> >> >
>> >> >> >> > -Jason
>> >> >> >> >
>> >> >> >> > On Mon, Jan 23, 2017 at 10:26 AM, Onur Karaman <
>> >> >> >> onurkaraman.apa...@gmail.com
>> >> >> >> >> wrote:
>> >> >> >> >
>> >> >> >> >> I only did a quick scan but I wanted to point out what I
> think
>> is
>> >> an
>> >> >> >> >> incorrect assumption in the KIP's caveats:
>> >> >> >> >> "
>> >> >> >> >> There is a risk using the MemoryPool that, after we fill up
> the
>> >> >> memory
>> >> >> >> with
>> >> >> >> >> fetch data, we can starve the coordinator's connection
>> >> >> >> >> ...
>> >> >> >> >> To alleviate this issue, only messages larger than 1Kb will
> be
>> >> >> >> allocated in
>> >> >> >> >> the MemoryPool. Smaller messages will be allocated directly
> on
>> the
>> >> >> Heap
>> >> >> >> >> like before. This allows group/heartbeat messages to avoid
>> being
>> >> >> >> delayed if
>> >> >> >> >> the MemoryPool fills up.
>> >> >> >> >> "
>> >> >> >> >>
>> >> >> >> >> So it sounds like there's an incorrect assumption that
>> responses
>> >> from
>> >> >> >> the
>> >> >> >> >> coordinator will always be small (< 1Kb as mentioned in the
>> >> caveat).
>> >> >> >> There
>> >> >> >> >> are now a handful of request types between clients and the
>> >> >> coordinator:
>> >> >> >> >> {JoinGroup, SyncGroup, LeaveGroup, Heartbeat, OffsetCommit,
>> >> >> OffsetFetch,
>> >> >> >> >> ListGroups, DescribeGroups}. While true (at least today) for
>> >> >> >> >> HeartbeatResponse and a few others, I don't think we can
> assume
>> >> >> >> >> JoinGroupResponse, SyncGroupResponse,
> DescribeGroupsResponse,
>> and
>> >> >> >> >> OffsetFetchResponse will be small, as they are effectively
>> >> bounded by
>> >> >> >> the
>> >> >> >> >> max message size allowed by the broker for the
>> __consumer_offsets
>> >> >> topic
>> >> >> >> >> which by default is 1MB.
>> >> >> >> >>
>> >> >> >> >> On Mon, Jan 23, 2017 at 9:46 AM, Mickael Maison <
>> >> >> >> mickael.mai...@gmail.com>
>> >> >> >> >> wrote:
>> >> >> >> >>
>> >> >> >> >> > I've updated the KIP to address all the comments raised
> here
>> and
>> >> >> from
>> >> >> >> >> > the "DISCUSS" thread.
>> >> >> >> >> > See:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> >> >> >> >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
>> >> >> >> >> >
>> >> >> >> >> > Now, I'd like to restart the vote.
>> >> >> >> >> >
>> >> >> >> >> > On Tue, Dec 6, 2016 at 9:02 AM, Rajini Sivaram
>> >> >> >> >> > <rajinisiva...@googlemail.com> wrote:
>> >> >> >> >> > > Hi Mickael,
>> >> >> >> >> > >
>> >> >> >> >> > > I am +1 on the overall approach of this KIP, but have a
>> >> couple of
>> >> >> >> >> > comments
>> >> >> >> >> > > (sorry, should have brought them up on the discuss
> thread
>> >> >> earlier):
>> >> >> >> >> > >
>> >> >> >> >> > > 1. Perhaps it would be better to do this after
> KAFKA-4137
>> >> >> >> >> > > <https://issues.apache.org/jira/browse/KAFKA-4137> is
>> >> >> implemented?
>> >> >> >> At
>> >> >> >> >> > the
>> >> >> >> >> > > moment, coordinator shares the same NetworkClient (and
>> hence
>> >> the
>> >> >> >> same
>> >> >> >> >> > > Selector) with consumer connections used for fetching
>> records.
>> >> >> Since
>> >> >> >> >> > > freeing of memory relies on consuming applications
> invoking
>> >> >> poll()
>> >> >> >> >> after
>> >> >> >> >> > > processing previous records and potentially after
>> committing
>> >> >> >> offsets,
>> >> >> >> >> it
>> >> >> >> >> > > will be good to ensure that coordinator is not blocked
> for
>> >> read
>> >> >> by
>> >> >> >> >> fetch
>> >> >> >> >> > > responses. This may be simpler once coordinator has its
> own
>> >> >> >> Selector.
>> >> >> >> >> > >
>> >> >> >> >> > > 2. The KIP says: *Once messages are returned to the
> user,
>> >> >> messages
>> >> >> >> are
>> >> >> >> >> > > deleted from the MemoryPool so new messages can be
> stored.*
>> >> >> >> >> > > Can you expand that a bit? I am assuming that partial
>> buffers
>> >> >> never
>> >> >> >> get
>> >> >> >> >> > > freed when some messages are returned to the user since
> the
>> >> >> >> consumer is
>> >> >> >> >> > > still holding a reference to the buffer. Would buffers
> be
>> >> freed
>> >> >> when
>> >> >> >> >> > > fetches for all the partitions in a response are parsed,
>> but
>> >> >> perhaps
>> >> >> >> >> not
>> >> >> >> >> > > yet returned to the user (i.e., is the memory freed when
> a
>> >> >> >> reference to
>> >> >> >> >> > the
>> >> >> >> >> > > response buffer is no longer required)? It will be good
> to
>> >> >> document
>> >> >> >> the
>> >> >> >> >> > > (approximate) maximum memory requirement for the
>> >> non-compressed
>> >> >> >> case.
>> >> >> >> >> > There
>> >> >> >> >> > > is data read from the socket, cached in the Fetcher and
> (as
>> >> Radai
>> >> >> >> has
>> >> >> >> >> > > pointed out), the records still with the user
> application.
>> >> >> >> >> > >
>> >> >> >> >> > >
>> >> >> >> >> > > On Tue, Dec 6, 2016 at 2:04 AM, radai <
>> >> >> radai.rosenbl...@gmail.com>
>> >> >> >> >> > wrote:
>> >> >> >> >> > >
>> >> >> >> >> > >> +1 (non-binding).
>> >> >> >> >> > >>
>> >> >> >> >> > >> small nit pick - just because you returned a response
> to
>> user
>> >> >> >> doesnt
>> >> >> >> >> > mean
>> >> >> >> >> > >> the memory id no longer used. for some cases the actual
>> >> "point
>> >> >> of
>> >> >> >> >> > >> termination" may be the deserializer (really
>> impl-dependant),
>> >> >> but
>> >> >> >> >> > >> generally, wouldnt it be "nice" to have an explicit
>> dispose()
>> >> >> call
>> >> >> >> on
>> >> >> >> >> > >> responses (with the addition that getting the next
> batch
>> of
>> >> data
>> >> >> >> from
>> >> >> >> >> a
>> >> >> >> >> > >> consumer automatically disposes the previous results)
>> >> >> >> >> > >>
>> >> >> >> >> > >> On Mon, Dec 5, 2016 at 6:53 AM, Edoardo Comar <
>> >> >> eco...@uk.ibm.com>
>> >> >> >> >> > wrote:
>> >> >> >> >> > >>
>> >> >> >> >> > >> > +1 (non binding)
>> >> >> >> >> > >> > --------------------------------------------------
>> >> >> >> >> > >> > Edoardo Comar
>> >> >> >> >> > >> > IBM MessageHub
>> >> >> >> >> > >> > eco...@uk.ibm.com
>> >> >> >> >> > >> > IBM UK Ltd, Hursley Park, SO21 2JN
>> >> >> >> >> > >> >
>> >> >> >> >> > >> > IBM United Kingdom Limited Registered in England and
>> Wales
>> >> >> with
>> >> >> >> >> number
>> >> >> >> >> > >> > 741598 Registered office: PO Box 41, North Harbour,
>> >> >> Portsmouth,
>> >> >> >> >> Hants.
>> >> >> >> >> > >> PO6
>> >> >> >> >> > >> > 3AU
>> >> >> >> >> > >> >
>> >> >> >> >> > >> >
>> >> >> >> >> > >> >
>> >> >> >> >> > >> > From:   Mickael Maison <mickael.mai...@gmail.com>
>> >> >> >> >> > >> > To:     dev@kafka.apache.org
>> >> >> >> >> > >> > Date:   05/12/2016 14:38
>> >> >> >> >> > >> > Subject:        [VOTE] KIP-81: Bound Fetch memory
> usage
>> in
>> >> the
>> >> >> >> >> > consumer
>> >> >> >> >> > >> >
>> >> >> >> >> > >> >
>> >> >> >> >> > >> >
>> >> >> >> >> > >> > Hi all,
>> >> >> >> >> > >> >
>> >> >> >> >> > >> > I'd like to start the vote for KIP-81:
>> >> >> >> >> > >> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> >> >> >> >> > >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
>> >> >> >> >> > >> >
>> >> >> >> >> > >> >
>> >> >> >> >> > >> > Thank you
>> >> >> >> >> > >> >
>> >> >> >> >> > >> >
>> >> >> >> >> > >> >
>> >> >> >> >> > >> >
>> >> >> >> >> > >> > Unless stated otherwise above:
>> >> >> >> >> > >> > IBM United Kingdom Limited - Registered in England
> and
>> >> Wales
>> >> >> with
>> >> >> >> >> > number
>> >> >> >> >> > >> > 741598.
>> >> >> >> >> > >> > Registered office: PO Box 41, North Harbour,
> Portsmouth,
>> >> >> >> Hampshire
>> >> >> >> >> PO6
>> >> >> >> >> > >> 3AU
>> >> >> >> >> > >> >
>> >> >> >> >> > >>
>> >> >> >> >> > >
>> >> >> >> >> > >
>> >> >> >> >> > >
>> >> >> >> >> > > --
>> >> >> >> >> > > Regards,
>> >> >> >> >> > >
>> >> >> >> >> > > Rajini
>> >> >> >> >> >
>> >> >> >> >>
>> >> >> >>
>> >> >>
>> >>
>> >
>> >
>> >
>> > --
>> > -- Guozhang
>>
>
>
>
> --
> -- Guozhang
>
>
>
> Unless stated otherwise above:
> IBM United Kingdom Limited - Registered in England and Wales with number
> 741598.
> Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU

Reply via email to