Marc, thanks much for documenting the guts! There is one correction for Fetch Request handling:
When is it satisfied? The fetch size requested is reached - ie. the amount of data the consumer wishes to receive in one response Consumer configuration: *fetch.message.max.bytes* As per the code: /** * A holding pen for fetch requests waiting to be satisfied */ class FetchRequestPurgatory(requestChannel: RequestChannel, purgeInterval: Int) extends RequestPurgatory[DelayedFetch, Int](brokerId, purgeInterval) { this.logIdent = "[FetchRequestPurgatory-%d] ".format(brokerId) /** * A fetch request is satisfied when it has accumulated enough data to meet the min_bytes field */ def checkSatisfied(messageSizeInBytes: Int, delayedFetch: DelayedFetch): Boolean = { val accumulatedSize = delayedFetch.bytesAccumulated.addAndGet(messageSizeInBytes) accumulatedSize >= delayedFetch.fetch.minBytes } On Fri, Nov 8, 2013 at 1:01 PM, Joel Koshy <jjkosh...@gmail.com> wrote: > Marc - thanks again for doing this. Couple of suggestions: > > - I would suggest removing the disclaimer and email quotes since this > can become a stand-alone clean document on what the purgatory is and > how it works. > - A diagram would be helpful - it could say, show the watcher map and > the expiration queue, and it will be especially useful if it can > show the flow of producer/fetch requests through the purgatory. That > would also help cut down a lot of the text in the doc. > - I think it would be preferrable to have just high-level details in > this document. Internal details (such as the purge interval > settings) can either be removed or moved (to say, a short faq or > config section at the end). > - In the overview may want to comment on why we added it: i.e., it is > the primary data structure we use for supporting long poll of > producer/fetch requests. E.g., if we don't do this consumers would > have to keep issuing fetch requests if there's no data yet - as > opposed to just saying "respond when 'n' bytes of data are available > or when 't' millisecs have elapsed, whichever is earlier." > - WRT your question on PurgatorySize - we added that just to keep a > tab on how many requests are sitting in purgatory (including both > watchers map and expiration queue) as a rough gauge of memory usage. > Also the fetch/producer request gauges should not collide - the > KafkaMetricsGroup class takes care of this. The CSV reporter might > run into issues though - I thought we had fixed that but could be > wrong. > > Joel > > On Thu, Nov 07, 2013 at 11:01:06PM -0800, Joel Koshy wrote: > > Excellent - thanks for putting that together! Will review it more > > carefully tomorrow and suggest some minor edits if required. > > > > On Thu, Nov 07, 2013 at 10:45:40PM -0500, Marc Labbe wrote: > > > I've just added a page for purgatory, feel free to comment/modify at > will. > > > I hope I didn't misinterpret too much of the code. > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Request+Purgatory+(0.8) > > > > > > I added a few questions of my own. > > > > > > > > > On Fri, Nov 1, 2013 at 9:43 PM, Joe Stein <joe.st...@stealth.ly> > wrote: > > > > > > > To edit the Wiki you need to send an ICLA > > > > http://www.apache.org/licenses/#clas to Apache and then once that > is done > > > > an email to priv...@kafka.apache.org (or to me and I will copy > private) > > > > with your Wiki username and that you sent the ICLA to Apache. > > > > > > > > Then, I can add you to edit the Wiki. > > > > > > > > /******************************************* > > > > Joe Stein > > > > Founder, Principal Consultant > > > > Big Data Open Source Security LLC > > > > http://www.stealth.ly > > > > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> > > > > ********************************************/ > > > > > > > > > > > > On Fri, Nov 1, 2013 at 9:08 PM, Marc Labbe <mrla...@gmail.com> > wrote: > > > > > > > > > Hi Joel, > > > > > > > > > > I used to have edit to the wiki, I made a few additions to it a > while ago > > > > > but it's seem I don't have it anymore. It might have been lost in > the > > > > > confluence update. I would be glad to add what I have written if I > get it > > > > > back. Otherwise, feel free to paste my words in one of the pages, > I don't > > > > > intend on asking for copyrights for this :). > > > > > > > > > > marc > > > > > > > > > > > > > > > On Fri, Nov 1, 2013 at 4:32 PM, Joel Koshy <jjkosh...@gmail.com> > wrote: > > > > > > > > > > > Marc, thanks for writing that up. I think it is worth adding some > > > > > > details on the request-purgatory on a wiki (Jay had started a > wiki > > > > > > page for kafka internals [1] a while ago, but we have not had > time to > > > > > > add much to it since.) Your write-up could be reviewed and added > > > > > > there. Do you have edit permissions on the wiki? > > > > > > > > > > > > As for the purge interval config - yes the documentation can be > > > > > > improved a bit. It's one of those "internal" configs that > generally > > > > > > don't need to be modified by users. The reason we added that was > as > > > > > > follows: > > > > > > - We found that for low-volume topics, replica fetch requests > were > > > > > > getting expired but sitting around in purgatory > > > > > > - This was because we were expiring them from the delay queue > (used to > > > > > > track when requests should expire), but they were still sitting > in the > > > > > > watcherFor map - i.e., they would get purged when the next > producer > > > > > > request to that topic/partition arrived, but for low volume > topics > > > > > > this could be a long time (or never in the worst case) and we > would > > > > > > eventually run into an OOME. > > > > > > - So we needed to periodically go through the entire watcherFor > map > > > > > > and explicitly remove those requests that had expired. > > > > > > - More details on this are in KAFKA-664. > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Joel > > > > > > > > > > > > [1] > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Internals > > > > > > > > > > > > On Fri, Nov 1, 2013 at 12:33 PM, Marc Labbe <mrla...@gmail.com> > wrote: > > > > > > > Guozhang, > > > > > > > > > > > > > > I have to agree with Priya the doc isn't very clear. Although > the > > > > > > > configuration is documented, it is simply rewording the name > of the > > > > > > config, > > > > > > > which isn't particularly useful if you want more information > about > > > > what > > > > > > the > > > > > > > purgatory is. I searched the whole wiki and doc and could not > find > > > > > > anything > > > > > > > very useful as opposed looking a the code. In this case, > > > > > > > kafka.server.KafkaApis and kafka.server.RequestPurgatory will > be your > > > > > > > friends. > > > > > > > > > > > > > > I'll try to add to Joe's answer here, mostly just reporting > what's > > > > > > > available in the Scala doc from the project. I am doing this to > > > > > > understand > > > > > > > the mechanics myself btw. > > > > > > > > > > > > > > As Joe said, messages are not dropped by the purgatory but > simply > > > > > removed > > > > > > > from the purgatory when they are satisfied. Satisfaction > conditions > > > > are > > > > > > > different for both fetch and produce requests and this is > implemented > > > > > in > > > > > > > their respective DelayedRequest implementation (DelayedFetch > and > > > > > > > DelayedProduce). > > > > > > > > > > > > > > Requests purgatories are defined as follow in the code: > > > > > > > - ProducerRequestPurgatory: A holding pen for produce requests > > > > waiting > > > > > > to > > > > > > > be satisfied. > > > > > > > - FetchRequestPurgatory: A holding pen for fetch requests > waiting to > > > > > be > > > > > > > satisfied > > > > > > > > > > > > > > Each request purgatory runs a thread (ExpiredRequestReaper). > This > > > > > thread > > > > > > > will first try to find an expired delayed request. When one if > found, > > > > > it > > > > > > > will run the purgatory's expire method to handle the delayed > request > > > > > > > expiration. In both produce and fetch cases, it sends a > response to > > > > the > > > > > > > client. An expired request will be a satisfied request. The > next step > > > > > of > > > > > > > the thread's loop is when it checks for the configuration > parameters > > > > > you > > > > > > > asked for initially (purgatory.purge.interval.requests). When > the > > > > > number > > > > > > of > > > > > > > delayed requests given to watch by the purgatory reaches this > value, > > > > it > > > > > > > goes through all previously queued requests and removes those > which > > > > are > > > > > > > marked as satisfied. Because of that, it is really an interval > more > > > > > than > > > > > > it > > > > > > > is a threshold since it doesn't really care about the amount of > > > > > satisfied > > > > > > > requests or the size of the queue. > > > > > > > > > > > > > > Producer request > > > > > > > - When is it added to purgatory (delayed)?: > > > > > > > * when it uses ack=-1 (actually, the code tells me anything > but 0 > > > > or > > > > > > 1); > > > > > > > Producer config: request.required.acks > > > > > > > * partitions have more than one replica (in this case, > ack=-1 isn't > > > > > > > different to ack=1 and it doesn't make much sense to use a > delayed > > > > > > request) > > > > > > > * not all partitions are in error > > > > > > > - When does it expire? when it reaches the timeout defined in > the > > > > > produce > > > > > > > request (ackTimeoutMs). Translates from producer config > > > > > > request.timeout.ms. > > > > > > > - What happens (on the broker) when it expires? Sends a > response to > > > > the > > > > > > > client. Response content depends on the request of course. > > > > > > > - When is it satisfied? I didn't find the courage to dig into > the > > > > > details > > > > > > > of this one :( ... but mainly when all the follower have also > > > > > > acknowledge > > > > > > > the produce request for their replica > > > > > > > > > > > > > > Fetch request > > > > > > > - When is it added to purgatory (delayed)? 2 parameters of the > > > > requests > > > > > > are > > > > > > > mainly useful here: max wait time and fetch size > > > > > > > * if max wait is greater than 0; otherwise, it is a blocking > call > > > > by > > > > > > the > > > > > > > consumer > > > > > > > * if fetch size is greater than the current size of data > available > > > > to > > > > > > > fulfil the request > > > > > > > - When does it expire? > > > > > > > * wait time: the amount of time the consumer is willing to > wait for > > > > > > data; > > > > > > > Consumer config: fetch.wait.max.ms > > > > > > > - When is it satisfied? the fetch size requested is reached - > ie. the > > > > > > > amount of data the consumer wishes to receive in one response > (from > > > > > > > consumer config: fetch.message.max.bytes) > > > > > > > > > > > > > > ****** > > > > > > > > > > > > > > It would be useful to add some information about the metrics > > > > associated > > > > > > > with this. > > > > > > > > > > > > > > Of course, I am all for being corrected if I said anything > wrong > > > > here. > > > > > > The > > > > > > > truth is always the code :-) > > > > > > > > > > > > > > marc > > > > > > > - mrtheb - > > > > > > > > > > > > > > > > > > > > > On Fri, Nov 1, 2013 at 2:45 AM, Priya Matpadi > > > > > > > <priya.matp...@ecofactor.com>wrote: > > > > > > > > > > > > > >> Guozhang, > > > > > > >> The documentation is not very clear. > > > > > > >> Marc's response for producer purgatory makes sense. > > > > > > >> I am not entirely clear on fetch purgatory. > > > > > > >> How does broker use purgatory? Is it a temporary holding > area? What > > > > > > happens > > > > > > >> to the messages if purge interval is exceeded in case of > either/both > > > > > > >> producer and consumer? Are messages dropped in this case? > > > > > > >> Thanks, > > > > > > >> Priya > > > > > > >> > > > > > > >> > > > > > > >> On Thu, Oct 31, 2013 at 2:47 PM, Guozhang Wang < > wangg...@gmail.com> > > > > > > wrote: > > > > > > >> > > > > > > >> > Hello Priya, > > > > > > >> > > > > > > > >> > You can find the definitions of these two configs here: > > > > > > >> > > > > > > > >> > http://kafka.apache.org/documentation.html#brokerconfigs > > > > > > >> > > > > > > > >> > Guozhang > > > > > > >> > > > > > > > >> > > > > > > > >> > On Thu, Oct 31, 2013 at 11:20 AM, Marc Labbe < > mrla...@gmail.com> > > > > > > wrote: > > > > > > >> > > > > > > > >> > > Hi Priya > > > > > > >> > > > > > > > > >> > > my understanding is producer requests will be delayed > (and put > > > > in > > > > > > >> request > > > > > > >> > > purgatory) only if your producer uses ack=-1. It will be > in the > > > > > > >> purgatory > > > > > > >> > > (delayed) until all brokers have acknowledged the > messages to be > > > > > > >> > > replicated. The documentation suggests to monitor the > > > > > > >> > > ProducerRequestPurgatory size metrics , but it only > applies if > > > > > > you're > > > > > > >> > using > > > > > > >> > > ack=-1, otherwise, this value will always be 0. > > > > > > >> > > > > > > > > >> > > For consumer requests, they'll be in purgatory (delayed) > until > > > > the > > > > > > max > > > > > > >> > > allowed time to respond has been reached, unless it has > enough > > > > > > messages > > > > > > >> > to > > > > > > >> > > fill the buffer before that. The request will not end up > in the > > > > > > >> purgatory > > > > > > >> > > if you're making a blocking request (max wait <= 0). > > > > > > >> > > > > > > > > >> > > Not sure about the configuration interval though. > > > > > > >> > > > > > > > > >> > > marc > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > On Thu, Oct 31, 2013 at 12:41 PM, Priya Matpadi < > > > > > > >> > > priya.matp...@ecofactor.com > > > > > > >> > > > wrote: > > > > > > >> > > > > > > > > >> > > > Hello, > > > > > > >> > > > What is purgatory? I believe the following two > properties > > > > relate > > > > > > to > > > > > > >> > > > consumer and producer respectively. > > > > > > >> > > > Could someone please explain the significance of these? > > > > > > >> > > > fetch.purgatory.purge.interval.requests=100 > > > > > > >> > > > producer.purgatory.purge.interval.requests=100 > > > > > > >> > > > > > > > > > >> > > > Thanks, > > > > > > >> > > > Priya > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > -- > > > > > > >> > -- Guozhang > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > >