Marc, thanks much for documenting the guts!
There is one correction for Fetch Request handling:

When is it satisfied?

The fetch size requested is reached - ie. the amount of data the consumer
wishes to receive in one response

Consumer configuration: *fetch.message.max.bytes*
As per the code:

  /**
   * A holding pen for fetch requests waiting to be satisfied
   */
  class FetchRequestPurgatory(requestChannel: RequestChannel,
purgeInterval: Int)
          extends RequestPurgatory[DelayedFetch, Int](brokerId,
purgeInterval) {
    this.logIdent = "[FetchRequestPurgatory-%d] ".format(brokerId)

    /**
     * A fetch request is satisfied when it has accumulated enough data to
meet the min_bytes field
     */
    def checkSatisfied(messageSizeInBytes: Int, delayedFetch:
DelayedFetch): Boolean = {
      val accumulatedSize =
delayedFetch.bytesAccumulated.addAndGet(messageSizeInBytes)
      accumulatedSize >= delayedFetch.fetch.minBytes
    }



On Fri, Nov 8, 2013 at 1:01 PM, Joel Koshy <jjkosh...@gmail.com> wrote:

> Marc - thanks again for doing this.  Couple of suggestions:
>
> - I would suggest removing the disclaimer and email quotes since this
>   can become a stand-alone clean document on what the purgatory is and
>   how it works.
> - A diagram would be helpful - it could say, show the watcher map and
>   the expiration queue, and it will be especially useful if it can
>   show the flow of producer/fetch requests through the purgatory. That
>   would also help cut down a lot of the text in the doc.
> - I think it would be preferrable to have just high-level details in
>   this document.  Internal details (such as the purge interval
>   settings) can either be removed or moved (to say, a short faq or
>   config section at the end).
> - In the overview may want to comment on why we added it: i.e., it is
>   the primary data structure we use for supporting long poll of
>   producer/fetch requests. E.g., if we don't do this consumers would
>   have to keep issuing fetch requests if there's no data yet - as
>   opposed to just saying "respond when 'n' bytes of data are available
>   or when 't' millisecs have elapsed, whichever is earlier."
> - WRT your question on PurgatorySize - we added that just to keep a
>   tab on how many requests are sitting in purgatory (including both
>   watchers map and expiration queue) as a rough gauge of memory usage.
>   Also the fetch/producer request gauges should not collide - the
>   KafkaMetricsGroup class takes care of this. The CSV reporter might
>   run into issues though - I thought we had fixed that but could be
>   wrong.
>
> Joel
>
> On Thu, Nov 07, 2013 at 11:01:06PM -0800, Joel Koshy wrote:
> > Excellent - thanks for putting that together! Will review it more
> > carefully tomorrow and suggest some minor edits if required.
> >
> > On Thu, Nov 07, 2013 at 10:45:40PM -0500, Marc Labbe wrote:
> > > I've just added a page for purgatory, feel free to comment/modify at
> will.
> > > I hope I didn't misinterpret too much of the code.
> > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/Request+Purgatory+(0.8)
> > >
> > > I added a few questions of my own.
> > >
> > >
> > > On Fri, Nov 1, 2013 at 9:43 PM, Joe Stein <joe.st...@stealth.ly>
> wrote:
> > >
> > > > To edit the Wiki you need to send an ICLA
> > > > http://www.apache.org/licenses/#clas to Apache and then once that
> is done
> > > > an email to priv...@kafka.apache.org (or to me and I will copy
> private)
> > > > with your Wiki username and that you sent the ICLA to Apache.
> > > >
> > > > Then, I can add you to edit the Wiki.
> > > >
> > > > /*******************************************
> > > >  Joe Stein
> > > >  Founder, Principal Consultant
> > > >  Big Data Open Source Security LLC
> > > >  http://www.stealth.ly
> > > >  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > > > ********************************************/
> > > >
> > > >
> > > > On Fri, Nov 1, 2013 at 9:08 PM, Marc Labbe <mrla...@gmail.com>
> wrote:
> > > >
> > > > > Hi Joel,
> > > > >
> > > > > I used to have edit to the wiki, I made a few additions to it a
> while ago
> > > > > but it's seem I don't have it anymore. It might have been lost in
> the
> > > > > confluence update. I would be glad to add what I have written if I
> get it
> > > > > back. Otherwise, feel free to paste my words in one of the pages,
> I don't
> > > > > intend on asking for copyrights for this :).
> > > > >
> > > > > marc
> > > > >
> > > > >
> > > > > On Fri, Nov 1, 2013 at 4:32 PM, Joel Koshy <jjkosh...@gmail.com>
> wrote:
> > > > >
> > > > > > Marc, thanks for writing that up. I think it is worth adding some
> > > > > > details on the request-purgatory on a wiki (Jay had started a
> wiki
> > > > > > page for kafka internals [1] a while ago, but we have not had
> time to
> > > > > > add much to it since.) Your write-up could be reviewed and added
> > > > > > there. Do you have edit permissions on the wiki?
> > > > > >
> > > > > > As for the purge interval config - yes the documentation can be
> > > > > > improved a bit. It's one of those "internal" configs that
> generally
> > > > > > don't need to be modified by users. The reason we added that was
> as
> > > > > > follows:
> > > > > > - We found that for low-volume topics, replica fetch requests
> were
> > > > > > getting expired but sitting around in purgatory
> > > > > > - This was because we were expiring them from the delay queue
> (used to
> > > > > > track when requests should expire), but they were still sitting
> in the
> > > > > > watcherFor map - i.e., they would get purged when the next
> producer
> > > > > > request to that topic/partition arrived, but for low volume
> topics
> > > > > > this could be a long time (or never in the worst case) and we
> would
> > > > > > eventually run into an OOME.
> > > > > > - So we needed to periodically go through the entire watcherFor
> map
> > > > > > and explicitly remove those requests that had expired.
> > > > > > - More details on this are in KAFKA-664.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Joel
> > > > > >
> > > > > > [1]
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Internals
> > > > > >
> > > > > > On Fri, Nov 1, 2013 at 12:33 PM, Marc Labbe <mrla...@gmail.com>
> wrote:
> > > > > > > Guozhang,
> > > > > > >
> > > > > > > I have to agree with Priya the doc isn't very clear. Although
> the
> > > > > > > configuration is documented, it is simply rewording the name
> of the
> > > > > > config,
> > > > > > > which isn't particularly useful if you want more information
> about
> > > > what
> > > > > > the
> > > > > > > purgatory is. I searched the whole wiki and doc and could not
> find
> > > > > > anything
> > > > > > > very useful as opposed looking a the code. In this case,
> > > > > > > kafka.server.KafkaApis and kafka.server.RequestPurgatory will
> be your
> > > > > > > friends.
> > > > > > >
> > > > > > > I'll try to add to Joe's answer here, mostly just reporting
> what's
> > > > > > > available in the Scala doc from the project. I am doing this to
> > > > > > understand
> > > > > > > the mechanics myself btw.
> > > > > > >
> > > > > > > As Joe said, messages are not dropped by the purgatory but
> simply
> > > > > removed
> > > > > > > from the purgatory when they are satisfied. Satisfaction
> conditions
> > > > are
> > > > > > > different for both fetch and produce requests and this is
> implemented
> > > > > in
> > > > > > > their respective DelayedRequest implementation (DelayedFetch
> and
> > > > > > > DelayedProduce).
> > > > > > >
> > > > > > > Requests purgatories are defined as follow in the code:
> > > > > > >  - ProducerRequestPurgatory: A holding pen for produce requests
> > > > waiting
> > > > > > to
> > > > > > > be satisfied.
> > > > > > >  - FetchRequestPurgatory: A holding pen for fetch requests
> waiting to
> > > > > be
> > > > > > > satisfied
> > > > > > >
> > > > > > > Each request purgatory runs a thread (ExpiredRequestReaper).
> This
> > > > > thread
> > > > > > > will first try to find an expired delayed request. When one if
> found,
> > > > > it
> > > > > > > will run the purgatory's expire method to handle the delayed
> request
> > > > > > > expiration. In both produce and fetch cases, it sends a
> response to
> > > > the
> > > > > > > client. An expired request will be a satisfied request. The
> next step
> > > > > of
> > > > > > > the thread's loop is when it checks for the configuration
> parameters
> > > > > you
> > > > > > > asked for initially (purgatory.purge.interval.requests). When
> the
> > > > > number
> > > > > > of
> > > > > > > delayed requests given to watch by the purgatory reaches this
> value,
> > > > it
> > > > > > > goes through all previously queued requests and removes those
> which
> > > > are
> > > > > > > marked as satisfied. Because of that, it is really an interval
> more
> > > > > than
> > > > > > it
> > > > > > > is a threshold since it doesn't really care about the amount of
> > > > > satisfied
> > > > > > > requests or the size of the queue.
> > > > > > >
> > > > > > > Producer request
> > > > > > > - When is it added to purgatory (delayed)?:
> > > > > > >   * when it uses ack=-1 (actually, the code tells me anything
> but 0
> > > > or
> > > > > > 1);
> > > > > > > Producer config: request.required.acks
> > > > > > >   * partitions have more than one replica (in this case,
> ack=-1 isn't
> > > > > > > different to ack=1 and it doesn't make much sense to use a
> delayed
> > > > > > request)
> > > > > > >   * not all partitions are in error
> > > > > > > - When does it expire? when it reaches the timeout defined in
> the
> > > > > produce
> > > > > > > request (ackTimeoutMs). Translates from producer config
> > > > > > request.timeout.ms.
> > > > > > > - What happens (on the broker) when it expires? Sends a
> response to
> > > > the
> > > > > > > client. Response content depends on the request of course.
> > > > > > > - When is it satisfied? I didn't find the courage to dig into
> the
> > > > > details
> > > > > > > of this one :(  ... but mainly when all the follower have also
> > > > > > acknowledge
> > > > > > > the produce request for their replica
> > > > > > >
> > > > > > > Fetch request
> > > > > > > - When is it added to purgatory (delayed)? 2 parameters of the
> > > > requests
> > > > > > are
> > > > > > > mainly useful here: max wait time and fetch size
> > > > > > >   * if max wait is greater than 0; otherwise, it is a blocking
> call
> > > > by
> > > > > > the
> > > > > > > consumer
> > > > > > >   * if fetch size is greater than the current size of data
> available
> > > > to
> > > > > > > fulfil the request
> > > > > > > - When does it expire?
> > > > > > >   * wait time: the amount of time the consumer is willing to
> wait for
> > > > > > data;
> > > > > > > Consumer config: fetch.wait.max.ms
> > > > > > > - When is it satisfied? the fetch size requested is reached -
> ie. the
> > > > > > > amount of data the consumer wishes to receive in one response
> (from
> > > > > > > consumer config: fetch.message.max.bytes)
> > > > > > >
> > > > > > > ******
> > > > > > >
> > > > > > > It would be useful to add some information about the metrics
> > > > associated
> > > > > > > with this.
> > > > > > >
> > > > > > > Of course, I am all for being corrected if I said anything
> wrong
> > > > here.
> > > > > > The
> > > > > > > truth is always the code :-)
> > > > > > >
> > > > > > > marc
> > > > > > > - mrtheb -
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Nov 1, 2013 at 2:45 AM, Priya Matpadi
> > > > > > > <priya.matp...@ecofactor.com>wrote:
> > > > > > >
> > > > > > >> Guozhang,
> > > > > > >> The documentation is not very clear.
> > > > > > >> Marc's response for producer purgatory makes sense.
> > > > > > >> I am not entirely clear on fetch purgatory.
> > > > > > >> How does broker use purgatory? Is it a temporary holding
> area? What
> > > > > > happens
> > > > > > >> to the messages if purge interval is exceeded in case of
> either/both
> > > > > > >> producer and consumer? Are messages dropped in this case?
> > > > > > >> Thanks,
> > > > > > >> Priya
> > > > > > >>
> > > > > > >>
> > > > > > >> On Thu, Oct 31, 2013 at 2:47 PM, Guozhang Wang <
> wangg...@gmail.com>
> > > > > > wrote:
> > > > > > >>
> > > > > > >> > Hello Priya,
> > > > > > >> >
> > > > > > >> > You can find the definitions of these two configs here:
> > > > > > >> >
> > > > > > >> > http://kafka.apache.org/documentation.html#brokerconfigs
> > > > > > >> >
> > > > > > >> > Guozhang
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > On Thu, Oct 31, 2013 at 11:20 AM, Marc Labbe <
> mrla...@gmail.com>
> > > > > > wrote:
> > > > > > >> >
> > > > > > >> > > Hi Priya
> > > > > > >> > >
> > > > > > >> > > my understanding is producer requests will be delayed
> (and put
> > > > in
> > > > > > >> request
> > > > > > >> > > purgatory) only if your producer uses ack=-1. It will be
> in the
> > > > > > >> purgatory
> > > > > > >> > > (delayed) until all brokers have acknowledged the
> messages to be
> > > > > > >> > > replicated. The documentation suggests to monitor the
> > > > > > >> > > ProducerRequestPurgatory size metrics , but it only
> applies if
> > > > > > you're
> > > > > > >> > using
> > > > > > >> > > ack=-1, otherwise, this value will always be 0.
> > > > > > >> > >
> > > > > > >> > > For consumer requests, they'll be in purgatory (delayed)
> until
> > > > the
> > > > > > max
> > > > > > >> > > allowed time to respond has been reached, unless it has
> enough
> > > > > > messages
> > > > > > >> > to
> > > > > > >> > > fill the buffer before that. The request will not end up
> in the
> > > > > > >> purgatory
> > > > > > >> > > if you're making a blocking request (max wait <= 0).
> > > > > > >> > >
> > > > > > >> > > Not sure about the configuration interval though.
> > > > > > >> > >
> > > > > > >> > > marc
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> > > On Thu, Oct 31, 2013 at 12:41 PM, Priya Matpadi <
> > > > > > >> > > priya.matp...@ecofactor.com
> > > > > > >> > > > wrote:
> > > > > > >> > >
> > > > > > >> > > > Hello,
> > > > > > >> > > > What is purgatory? I believe the following two
> properties
> > > > relate
> > > > > > to
> > > > > > >> > > > consumer and producer respectively.
> > > > > > >> > > > Could someone please explain the significance of these?
> > > > > > >> > > > fetch.purgatory.purge.interval.requests=100
> > > > > > >> > > > producer.purgatory.purge.interval.requests=100
> > > > > > >> > > >
> > > > > > >> > > > Thanks,
> > > > > > >> > > > Priya
> > > > > > >> > > >
> > > > > > >> > >
> > > > > > >> >
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > --
> > > > > > >> > -- Guozhang
> > > > > > >> >
> > > > > > >>
> > > > > >
> > > > >
> > > >
> >
>
>

Reply via email to