Marc, thanks for writing that up. I think it is worth adding some details on the request-purgatory on a wiki (Jay had started a wiki page for kafka internals [1] a while ago, but we have not had time to add much to it since.) Your write-up could be reviewed and added there. Do you have edit permissions on the wiki?
As for the purge interval config - yes the documentation can be improved a bit. It's one of those "internal" configs that generally don't need to be modified by users. The reason we added that was as follows: - We found that for low-volume topics, replica fetch requests were getting expired but sitting around in purgatory - This was because we were expiring them from the delay queue (used to track when requests should expire), but they were still sitting in the watcherFor map - i.e., they would get purged when the next producer request to that topic/partition arrived, but for low volume topics this could be a long time (or never in the worst case) and we would eventually run into an OOME. - So we needed to periodically go through the entire watcherFor map and explicitly remove those requests that had expired. - More details on this are in KAFKA-664. Thanks, Joel [1] https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Internals On Fri, Nov 1, 2013 at 12:33 PM, Marc Labbe <mrla...@gmail.com> wrote: > Guozhang, > > I have to agree with Priya the doc isn't very clear. Although the > configuration is documented, it is simply rewording the name of the config, > which isn't particularly useful if you want more information about what the > purgatory is. I searched the whole wiki and doc and could not find anything > very useful as opposed looking a the code. In this case, > kafka.server.KafkaApis and kafka.server.RequestPurgatory will be your > friends. > > I'll try to add to Joe's answer here, mostly just reporting what's > available in the Scala doc from the project. I am doing this to understand > the mechanics myself btw. > > As Joe said, messages are not dropped by the purgatory but simply removed > from the purgatory when they are satisfied. Satisfaction conditions are > different for both fetch and produce requests and this is implemented in > their respective DelayedRequest implementation (DelayedFetch and > DelayedProduce). > > Requests purgatories are defined as follow in the code: > - ProducerRequestPurgatory: A holding pen for produce requests waiting to > be satisfied. > - FetchRequestPurgatory: A holding pen for fetch requests waiting to be > satisfied > > Each request purgatory runs a thread (ExpiredRequestReaper). This thread > will first try to find an expired delayed request. When one if found, it > will run the purgatory's expire method to handle the delayed request > expiration. In both produce and fetch cases, it sends a response to the > client. An expired request will be a satisfied request. The next step of > the thread's loop is when it checks for the configuration parameters you > asked for initially (purgatory.purge.interval.requests). When the number of > delayed requests given to watch by the purgatory reaches this value, it > goes through all previously queued requests and removes those which are > marked as satisfied. Because of that, it is really an interval more than it > is a threshold since it doesn't really care about the amount of satisfied > requests or the size of the queue. > > Producer request > - When is it added to purgatory (delayed)?: > * when it uses ack=-1 (actually, the code tells me anything but 0 or 1); > Producer config: request.required.acks > * partitions have more than one replica (in this case, ack=-1 isn't > different to ack=1 and it doesn't make much sense to use a delayed request) > * not all partitions are in error > - When does it expire? when it reaches the timeout defined in the produce > request (ackTimeoutMs). Translates from producer config request.timeout.ms. > - What happens (on the broker) when it expires? Sends a response to the > client. Response content depends on the request of course. > - When is it satisfied? I didn't find the courage to dig into the details > of this one :( ... but mainly when all the follower have also acknowledge > the produce request for their replica > > Fetch request > - When is it added to purgatory (delayed)? 2 parameters of the requests are > mainly useful here: max wait time and fetch size > * if max wait is greater than 0; otherwise, it is a blocking call by the > consumer > * if fetch size is greater than the current size of data available to > fulfil the request > - When does it expire? > * wait time: the amount of time the consumer is willing to wait for data; > Consumer config: fetch.wait.max.ms > - When is it satisfied? the fetch size requested is reached - ie. the > amount of data the consumer wishes to receive in one response (from > consumer config: fetch.message.max.bytes) > > ****** > > It would be useful to add some information about the metrics associated > with this. > > Of course, I am all for being corrected if I said anything wrong here. The > truth is always the code :-) > > marc > - mrtheb - > > > On Fri, Nov 1, 2013 at 2:45 AM, Priya Matpadi > <priya.matp...@ecofactor.com>wrote: > >> Guozhang, >> The documentation is not very clear. >> Marc's response for producer purgatory makes sense. >> I am not entirely clear on fetch purgatory. >> How does broker use purgatory? Is it a temporary holding area? What happens >> to the messages if purge interval is exceeded in case of either/both >> producer and consumer? Are messages dropped in this case? >> Thanks, >> Priya >> >> >> On Thu, Oct 31, 2013 at 2:47 PM, Guozhang Wang <wangg...@gmail.com> wrote: >> >> > Hello Priya, >> > >> > You can find the definitions of these two configs here: >> > >> > http://kafka.apache.org/documentation.html#brokerconfigs >> > >> > Guozhang >> > >> > >> > On Thu, Oct 31, 2013 at 11:20 AM, Marc Labbe <mrla...@gmail.com> wrote: >> > >> > > Hi Priya >> > > >> > > my understanding is producer requests will be delayed (and put in >> request >> > > purgatory) only if your producer uses ack=-1. It will be in the >> purgatory >> > > (delayed) until all brokers have acknowledged the messages to be >> > > replicated. The documentation suggests to monitor the >> > > ProducerRequestPurgatory size metrics , but it only applies if you're >> > using >> > > ack=-1, otherwise, this value will always be 0. >> > > >> > > For consumer requests, they'll be in purgatory (delayed) until the max >> > > allowed time to respond has been reached, unless it has enough messages >> > to >> > > fill the buffer before that. The request will not end up in the >> purgatory >> > > if you're making a blocking request (max wait <= 0). >> > > >> > > Not sure about the configuration interval though. >> > > >> > > marc >> > > >> > > >> > > On Thu, Oct 31, 2013 at 12:41 PM, Priya Matpadi < >> > > priya.matp...@ecofactor.com >> > > > wrote: >> > > >> > > > Hello, >> > > > What is purgatory? I believe the following two properties relate to >> > > > consumer and producer respectively. >> > > > Could someone please explain the significance of these? >> > > > fetch.purgatory.purge.interval.requests=100 >> > > > producer.purgatory.purge.interval.requests=100 >> > > > >> > > > Thanks, >> > > > Priya >> > > > >> > > >> > >> > >> > >> > -- >> > -- Guozhang >> > >>