Hi Becket, You are right: the calculations are per partition produced to by each idempotent producer. I actually think this makes the problem more acute when we actually end up enabling the idempotent producer by default. However, even the most optimized version will still result in an overhead of at least 46 bytes per (producer, broker, partition) triplet.
I will call this out in KIP-185. I think we would want to optimize the memory utilization of each (partition, broker, producer) triplet before turning on the default. Another option could be to default max.in.flight to 2 and retain the metadata of just 2 batches. This would significantly reduce the memory overhead in the default distribution, and in the most common cases would not result in produce responses without the record metadata. That may be a simple way to address the issue. Thanks, Apurva On Mon, Sep 11, 2017 at 7:40 PM, Becket Qin <becket....@gmail.com> wrote: > Hi Apurva, > > Thanks for the explanation. > > I think the STO will be per producer/partition, right? Am I missing > something? You are right that the proposal does not strengthen the > semantic. The goal is more about trying to bound the memory consumption to > some reasonable number and use the memory more efficiently. > > But I agree it is not a blocker for this KIP as the memory pressure may not > be that big. With 5K partitions per broker, 50 producers per partition on > average, we are going to consume about 35 MB of memory with 5 entries (142 > bytes) in cache. So it is probably still OK and it is more urgent to fix > the upgrade path. > > Thanks, > > Jiangjie (Becket) Qin > > > > > > On Mon, Sep 11, 2017 at 4:13 PM, Apurva Mehta <apu...@confluent.io> wrote: > > > Hi Becket, > > > > Regarding the current implementation: we opted for a simpler server side > > implementation where we _don't_ snapshot the metadata of the last 5 > batches > > to disk. So if a broker fails, comes back online, and is the leader > again, > > it will only have the last batch in memory. With max.in.flight = 5, it is > > thus possible to receive duplicates of 4 prior batches and yet not have > the > > metadata. > > > > Since we can return DuplicateSequence to the client, and since this is > > considered a successful response on the client, this is a good solution. > It > > also means that we no longer have a hard limit of max.in.flight == 5: if > > you use a larger value, you are more likely to receive responses without > > the offset/timestamp metadata. > > > > Your suggestion for sending the lastAckdSequence per partition would help > > reduce memory on the broker, but it won't bound it: we need at least one > > cached batch per producer to do sequence number validation so it will > > always grow proportional to the number of active producers. Nor does it > > uniquely solve the problem of removing the cap on max.in.flight: > producers > > are still not guaranteed that the metadata for all their inflight batches > > will always be cached and returned. > > > > So it doesn't strengthen any semantics, but does optimize memory usage on > > the broker. But what's the usage with the proposed changes? With 5 cached > > batches, each producerIdEntry will be 142 bytes, or 7000 active producers > > who use idempotence will take 1MB per broker. With 1 cached batch, we > save > > at most 96 bytes per producer, so we could have at most 21000 active > > producers using idempotence per MB of broker memory. > > > > The savings are significant, but I am not convinced optimizing this is > > worth it right now since we are not making the idempotent producer the > > default in this release. Further there are a bunch of other items which > > make exactly once semantics more usable which we are working on in this > > release. Given that there are only 8 days left till feature freeze, I > would > > rather tackle the usability issues (KIP-192) than make these memory > > optimizations on the broker. The payoff from the latter seems much > smaller, > > and it is always possible to do in the future. > > > > What does the rest of the community think? > > > > Thanks, > > Apurva > > > > On Mon, Sep 11, 2017 at 2:39 PM, Becket Qin <becket....@gmail.com> > wrote: > > > > > Hi Apurva, > > > > > > Sorry for being late on this thread. I am trying to understand the > > > implementation of case that we will throw DuplicateSequenceException. > My > > > understanding is the following: > > > 1. On the broker side, we will cache 5 most recent > > > sequence/timestamp/offset (STO) for each of the producer ID. > > > 2. When duplicate occurs and the producer has max.in.flight.requests > set > > to > > > <=5, we can return the timestamp/offset from the cached STO. > > > 3. When duplicate occurs and the producer has max.in.flight.requests > > > greater than 5. We may need to return DuplicateSequenceException just > to > > > indicate the sequence is duplicate, but the ProduceResponse will not > have > > > timestamp/offset because it may be out of the 5 entries in the cache. > > > > > > Is my understanding correct? If it is the current implementation, I > have > > a > > > few concerns: > > > 1. One potential issue for this is that if there are a lot of producers > > > producing to the same cluster (e.g. a Map-Reduce job) we may still > spend > > a > > > lot of memory on caching the most recent STO. > > > 2. In most cases, we are essentially caching the > sequnce/timestamp/offset > > > entries that may never be used again. > > > > > > Since we are making protocol changes, I am wondering if we can improve > > the > > > above two cases by doing the following: > > > 1. Add a per partition LastAckedSequence field in the ProduceRequest. > > > 2. The broker will remove the cached STO entries whose sequence is less > > > than or equals to LastAckedSequence for each Partition/PID. > > > 3. Have a global STO cache to cap the number of total cached STO > entries > > to > > > some number, say 1 million. This total number is shared by all the > > > producers. If this number is reached, we will remove the entries from > the > > > producer who has the most cached STO entries. > > > 4. If there is a sequence smaller than the last sequence and there is > no > > > entry in the STO cache, we return DuplicateSequenceException without > > > offset/timestamp. > > > > > > With the above changes, we can have the following benefits: > > > 1. avoid caching the sequence/timestamp/offset unnecessarily because > all > > > the cached entries are the entries that hasn't been confirmed by the > > > producer. > > > 2. no magic number of 5 max.in.flight.requests.per.connection > > > 3. bounded memory footprint on the cached sequence/timestamp/offset > > > entries. > > > > > > Hope it's not too late to have the changes if that makes sense. > > > > > > Thanks, > > > > > > Jiangjie (Becket) Qin > > > > > > > > > On Mon, Sep 11, 2017 at 11:21 AM, Apurva Mehta <apu...@confluent.io> > > > wrote: > > > > > > > Thanks for the votes everyone. > > > > > > > > One of the proposals here was to raise a 'DuplicateSequenceException' > > to > > > > the user if the broker detected that one of the internal retries > > resulted > > > > in the duplicate, and the metadata for the original batch was no > longer > > > > cached. > > > > > > > > However, when implementing this change, I realized that this is quite > > > > unintuitive from the user's point of view. In reality, the > 'duplicate' > > is > > > > only due to internal retries -- something that the user has no > > visibility > > > > into. And secondly, this is not an error: the batch has been > persisted, > > > > only the cached metadata has been lost. > > > > > > > > I think the better approach is to return the a 'success' but make it > > > clear > > > > that there is no record metadata. If the user tries to access > > > > `RecordMetadata.offset` or `RecordMetadata.timestamp` methods of the > > > > returned metadata, we can raise a 'NoMetadataAvailableException' or > > > > something like that. > > > > > > > > This way users who don't access the 'offset' and 'timestamp' fields > > would > > > > not notice a change. For the users who do, the offset and timestamp > > will > > > > not silently be invalid: they will be notified through an exception. > > > > > > > > This seems like the cleanest way forward and I would like to make > this > > > > small change to the KIP. > > > > > > > > Does anybody have any objections? > > > > > > > > Thanks, > > > > Apurva > > > > > > > > > > > > > > > > On Thu, Sep 7, 2017 at 9:44 PM, Apurva Mehta <apu...@confluent.io> > > > wrote: > > > > > > > > > Thanks for the comments Ismael. > > > > > > > > > > I have gone ahead and incorporated all your suggestions in the KIP > > > > > document. You convinced me on adding max.message.bytes :) > > > > > > > > > > Apurva > > > > > > > > > > On Thu, Sep 7, 2017 at 6:12 PM, Ismael Juma <ism...@juma.me.uk> > > wrote: > > > > > > > > > >> Thanks for the KIP. +1 (binding) from me. A few minor comments: > > > > >> > > > > >> 1. We should add a note to the backwards compatibility section > > > > explaining > > > > >> the impact of throwing DuplicateSequenceException (a new > exception) > > > from > > > > >> `send`. As I understand it, it's not an issue, but good to include > > it > > > in > > > > >> the KIP. > > > > >> > > > > >> 2. For clarity, it's good to highlight in some way the new fields > in > > > the > > > > >> protocol definition itself > > > > >> > > > > >> 3. I understand that you decided not to add max.message.bytes > > because > > > > it's > > > > >> unrelated to this KIP. I'll try to persuade you that we should, > but > > > it's > > > > >> not a blocker if you don't agree. The reasons are: 1. The > > > implementation > > > > >> effort to add it is minimal since it's a topic config like message > > > > format > > > > >> version, 2. It's clearly beneficial for the producer to have that > > > > >> information, 3. It's compact (just a number), 4. It's nice to > avoid > > > > >> another > > > > >> protocol bump for a small change like that. > > > > >> > > > > >> Thanks, > > > > >> Ismael > > > > >> > > > > >> On Thu, Sep 7, 2017 at 9:51 PM, Apurva Mehta <apu...@confluent.io > > > > > > wrote: > > > > >> > > > > >> > Hi, > > > > >> > > > > > >> > I'd like to start a vote for KIP-192: > > > > >> > > > > > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > > >> > 192+%3A+Provide+cleaner+semantics+when+idempotence+is+enabled > > > > >> > > > > > >> > Thanks, > > > > >> > Apurva > > > > >> > > > > > >> > > > > > > > > > > > > > > > > > > > >