Marc, thanks for writing that up. I think it is worth adding some
details on the request-purgatory on a wiki (Jay had started a wiki
page for kafka internals [1] a while ago, but we have not had time to
add much to it since.) Your write-up could be reviewed and added
there. Do you have edit permissions on the wiki?

As for the purge interval config - yes the documentation can be
improved a bit. It's one of those "internal" configs that generally
don't need to be modified by users. The reason we added that was as
follows:
- We found that for low-volume topics, replica fetch requests were
getting expired but sitting around in purgatory
- This was because we were expiring them from the delay queue (used to
track when requests should expire), but they were still sitting in the
watcherFor map - i.e., they would get purged when the next producer
request to that topic/partition arrived, but for low volume topics
this could be a long time (or never in the worst case) and we would
eventually run into an OOME.
- So we needed to periodically go through the entire watcherFor map
and explicitly remove those requests that had expired.
- More details on this are in KAFKA-664.

Thanks,

Joel

[1] https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Internals

On Fri, Nov 1, 2013 at 12:33 PM, Marc Labbe <mrla...@gmail.com> wrote:
> Guozhang,
>
> I have to agree with Priya the doc isn't very clear. Although the
> configuration is documented, it is simply rewording the name of the config,
> which isn't particularly useful if you want more information about what the
> purgatory is. I searched the whole wiki and doc and could not find anything
> very useful as opposed looking a the code. In this case,
> kafka.server.KafkaApis and kafka.server.RequestPurgatory will be your
> friends.
>
> I'll try to add to Joe's answer here, mostly just reporting what's
> available in the Scala doc from the project. I am doing this to understand
> the mechanics myself btw.
>
> As Joe said, messages are not dropped by the purgatory but simply removed
> from the purgatory when they are satisfied. Satisfaction conditions are
> different for both fetch and produce requests and this is implemented in
> their respective DelayedRequest implementation (DelayedFetch and
> DelayedProduce).
>
> Requests purgatories are defined as follow in the code:
>  - ProducerRequestPurgatory: A holding pen for produce requests waiting to
> be satisfied.
>  - FetchRequestPurgatory: A holding pen for fetch requests waiting to be
> satisfied
>
> Each request purgatory runs a thread (ExpiredRequestReaper). This thread
> will first try to find an expired delayed request. When one if found, it
> will run the purgatory's expire method to handle the delayed request
> expiration. In both produce and fetch cases, it sends a response to the
> client. An expired request will be a satisfied request. The next step of
> the thread's loop is when it checks for the configuration parameters you
> asked for initially (purgatory.purge.interval.requests). When the number of
> delayed requests given to watch by the purgatory reaches this value, it
> goes through all previously queued requests and removes those which are
> marked as satisfied. Because of that, it is really an interval more than it
> is a threshold since it doesn't really care about the amount of satisfied
> requests or the size of the queue.
>
> Producer request
> - When is it added to purgatory (delayed)?:
>   * when it uses ack=-1 (actually, the code tells me anything but 0 or 1);
> Producer config: request.required.acks
>   * partitions have more than one replica (in this case, ack=-1 isn't
> different to ack=1 and it doesn't make much sense to use a delayed request)
>   * not all partitions are in error
> - When does it expire? when it reaches the timeout defined in the produce
> request (ackTimeoutMs). Translates from producer config request.timeout.ms.
> - What happens (on the broker) when it expires? Sends a response to the
> client. Response content depends on the request of course.
> - When is it satisfied? I didn't find the courage to dig into the details
> of this one :(  ... but mainly when all the follower have also acknowledge
> the produce request for their replica
>
> Fetch request
> - When is it added to purgatory (delayed)? 2 parameters of the requests are
> mainly useful here: max wait time and fetch size
>   * if max wait is greater than 0; otherwise, it is a blocking call by the
> consumer
>   * if fetch size is greater than the current size of data available to
> fulfil the request
> - When does it expire?
>   * wait time: the amount of time the consumer is willing to wait for data;
> Consumer config: fetch.wait.max.ms
> - When is it satisfied? the fetch size requested is reached - ie. the
> amount of data the consumer wishes to receive in one response (from
> consumer config: fetch.message.max.bytes)
>
> ******
>
> It would be useful to add some information about the metrics associated
> with this.
>
> Of course, I am all for being corrected if I said anything wrong here. The
> truth is always the code :-)
>
> marc
> - mrtheb -
>
>
> On Fri, Nov 1, 2013 at 2:45 AM, Priya Matpadi
> <priya.matp...@ecofactor.com>wrote:
>
>> Guozhang,
>> The documentation is not very clear.
>> Marc's response for producer purgatory makes sense.
>> I am not entirely clear on fetch purgatory.
>> How does broker use purgatory? Is it a temporary holding area? What happens
>> to the messages if purge interval is exceeded in case of either/both
>> producer and consumer? Are messages dropped in this case?
>> Thanks,
>> Priya
>>
>>
>> On Thu, Oct 31, 2013 at 2:47 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>>
>> > Hello Priya,
>> >
>> > You can find the definitions of these two configs here:
>> >
>> > http://kafka.apache.org/documentation.html#brokerconfigs
>> >
>> > Guozhang
>> >
>> >
>> > On Thu, Oct 31, 2013 at 11:20 AM, Marc Labbe <mrla...@gmail.com> wrote:
>> >
>> > > Hi Priya
>> > >
>> > > my understanding is producer requests will be delayed (and put in
>> request
>> > > purgatory) only if your producer uses ack=-1. It will be in the
>> purgatory
>> > > (delayed) until all brokers have acknowledged the messages to be
>> > > replicated. The documentation suggests to monitor the
>> > > ProducerRequestPurgatory size metrics , but it only applies if you're
>> > using
>> > > ack=-1, otherwise, this value will always be 0.
>> > >
>> > > For consumer requests, they'll be in purgatory (delayed) until the max
>> > > allowed time to respond has been reached, unless it has enough messages
>> > to
>> > > fill the buffer before that. The request will not end up in the
>> purgatory
>> > > if you're making a blocking request (max wait <= 0).
>> > >
>> > > Not sure about the configuration interval though.
>> > >
>> > > marc
>> > >
>> > >
>> > > On Thu, Oct 31, 2013 at 12:41 PM, Priya Matpadi <
>> > > priya.matp...@ecofactor.com
>> > > > wrote:
>> > >
>> > > > Hello,
>> > > > What is purgatory? I believe the following two properties relate to
>> > > > consumer and producer respectively.
>> > > > Could someone please explain the significance of these?
>> > > > fetch.purgatory.purge.interval.requests=100
>> > > > producer.purgatory.purge.interval.requests=100
>> > > >
>> > > > Thanks,
>> > > > Priya
>> > > >
>> > >
>> >
>> >
>> >
>> > --
>> > -- Guozhang
>> >
>>

Reply via email to