The difference between p.i.q.window.count and p.i.q.window.num: To be honest, I may have misunderstood your definition of window num. But here is what I have in mind:
1. p.i.q.window.size.seconds the length of time that a window will exist. This is also the maximum time between PID uses where the PID is considered to be the same. Reuse of the PID after p.iq.window.size.seconds triggers recording the PID as a new PID. Define a new configuration option "producer.id.quota.window.count" as the number of windows active in window.size.seconds. 2. p.i.q.window.count (the new one), how many sections to break p.i.q.window.size.seconds into. In the initial KIP this is 2. This value gives us the timing for creating a new layer in the layered bloom filter. So every (p.i.q.window.size.seconds / p.i.q.window.count) seconds a new layer will be created and an old layer or layers will be removed. The layered bloom filter will add layers to keep the probability of false positives in range. 3. p.i.q.window.num, specified as 11 in the KIP. I thought this was how many PIDs were expected in each window. In the original KIP this means that we expect to see the PID 22 times (2 windows x 11). In the Layered Bloom filter this would be the N value for the Shape. 4. p.i.q.window.fpr (needs to be specified) the expected false positive rate. Not sure how to express this in the config in a way that makes sense but something like 0.000006 or the like. This is the P value for the Shape. See https://hur.st/bloomfilter for a Bloom filter calculator. Once we have the N and P for the Shape the shape can be instantiated as "Shape s = Shape.fromNP( int n, double p );" In the layered filter once N items have been added to the layer a new layer is created. Layers are removed after p.i.q.window.size.seconds so if there is a burst of PIDs the number of layers will expand and then shrink back as the PIDs expire. While running there is always at least 1 layer. Some calculated points: - No layer will span more than p.i.q.window.size.seconds / p.i.q.window.count seconds. - Assuming Shape.N = 11, p.i.q.window.size.seconds = 60 p.i.q.window.count =2 and a rate of 22 PIDs per minute there will be 2 layers. - Assuming Shape.N = 11, p.i.q.window.size.seconds = 60 p.i.q.window.count =2 and a rate of 10 PIDs per minute there will be 2 layers. - Assuming Shape.N = 11, p.i.q.window.size.seconds = 60 p.i.q.window.count =2 and that no PIDS have been seen in the last 30 seconds there will be 2 layers. - Assuming Shape.N = 11, p.i.q.window.size.seconds = 60 p.i.q.window.count =2 and that no PIDS have been seen in the last 60 seconds there will be 1 layer. - Assuming Shape.N = 11, p.i.q.window.size.seconds = 60 p.i.q.window.count =2 and a rate of 23 to 44 PIDs per minute there will be 4 layers. - the false positive rate across the layers remains at or below Shape.P - Assuming Shape.N = 11 and Shape.P = 0.000006 the Bloom filter at each layer will consume 35 bytes. https://hur.st/bloomfilter provides a quick calculator for other values. Claude On Tue, Apr 16, 2024 at 8:06 AM Claude Warren <cla...@xenei.com> wrote: > Let's put aside the CPC datasketch idea and just discuss the Bloom filter > approach. > > I thinkthe problem with the way the KIP is worded is that PIDs are only > added if they are not seen in either of the Bloom filters. > > So an early PID is added to the first filter and the associated metric is > updated. > that PID is seen multiple times over the next 60 minutes, but is not added > to the Bloom filters again. > once the 60 minutes elapses the first filter is cleared, or removed and a > new one started. In any case the PID is no longer recorded in any extant > Bloom filter. > the PID is seen again and is added to the newest bloom filter and the > associated metric is updated. > > I believe at this point the metric is incorrect, the PID has been counted > 2x, when it has been in use for the entire time. > > The "track" method that I added solves this problem by ensuring that the > PID is always seen in the latter half of the set of Bloom filters. In the > case of 2 filters that is always the second one, but remember that the > number of layers will grow as the filters become saturated. So if your > filter is intended to hold 500 PIDs and the 501st PID is registered before > the expiration a new layer (Bloom filter) is added for new PIDS to be added > into. > > On Mon, Apr 15, 2024 at 5:00 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com> > wrote: > >> Hi Claude, >> Thanks for the implementation of the LayeredBloomFilter in apache >> commons. >> >> > Define a new configuration option "producer.id.quota.window.count" as >> > the number of windows active in window.size.seconds. >> What is the different between “producer.id.quota.window.count” and >> producer.id.quota.window.num >> >> > Basically the kip says, if the PID is found in either of the Bloom >> filters >> > then no action is taken >> > If the PID is not found then it is added and the quota rating metrics >> are >> > incremented. >> > In this case long running PIDs will be counted multiple times. >> >> The PID is considered not encountered if both frames of the window don’t >> have it. If you checked the diagram of for `Caching layer to track active >> PIDs per KafkaPrincipal` you will see that each window will have 2 bloom >> layers and the first created one will be disposed only when we start the >> next window. Which means window2 is starting from the 2nd bloom. Basically >> the bloom filter in the KIP is trying to implement a sliding window >> pattern. >> >> > think the question is not whether or not we have seen a given PID >> before >> > but rather how many unique PIDs did the principal create in the last >> hour. >> > Perhaps more exactly it is: did the Principal create more than X PIDS in >> > the last Y time units? >> We don’t really care about the count of unique PIDs per user. The KIP is >> trying to follow and build on top of ClientQuotaManager which already have >> a patter for throttling that the producer client is aware of so we don’t >> need to upgrade old clients for brokers to throttle them and they can >> respect the throttling. >> >> The pattern for throttling is that we record the activities by >> incrementing a metric sensor and only when we catch >> `QuotaViolationException` from the quota sensor we will be sending a >> throttleTimeMs to the client. >> For bandwidth throttling for example we increment the sensor by the size >> of the request. For PID the KIP is aiming to call >> `QuotaManagers::producerIdQuotaManager::maybeRecordAndGetThrottleTimeMs` to >> increment by +1 every time we encounter a new PID and if and if >> `Sensor::record` returned `QuotaViolationException` then we will send back >> to the producer the trolling time that the client should wait for before >> sending a new request with a new PID. >> I hope this make sense. >> >> > This question can be quickly answered by a CPC datasketch [1]. The >> > solution would be something like: >> > Break the Y time units into a set of Y' smaller partitions (e.g. 60 >> > 1-minute partitions for an hour). Create a circular queue of Y' CPC >> > datasketches for each principal. Implement a queue entry selector >> based on >> > the modulus of the system by the resolution of the Y' partitions. On >> each >> > call: >> I didn’t evaluate CPC datasketch or any counter solution as I explained >> above the aim is not to build a counter specially the Kafka Sensor can be >> enough to indicate if we are violating the quota or not. >> >> Thanks >> Omnia >> >> > On 15 Apr 2024, at 10:35, Claude Warren <cla...@apache.org> wrote: >> > >> > After thinking about his KIP over the weekend I think that there is >> another >> > lighter weight approach. >> > >> > I think the question is not whether or not we have seen a given PID >> before >> > but rather how many unique PIDs did the principal create in the last >> hour. >> > Perhaps more exactly it is: did the Principal create more than X PIDS in >> > the last Y time units? >> > >> > This question can be quickly answered by a CPC datasketch [1]. The >> > solution would be something like: >> > Break the Y time units into a set of Y' smaller partitions (e.g. 60 >> > 1-minute partitions for an hour). Create a circular queue of Y' CPC >> > datasketches for each principal. Implement a queue entry selector >> based on >> > the modulus of the system by the resolution of the Y' partitions. On >> each >> > call: >> > >> > On queue entry selector change clear the CPC (makes it empty) >> > Add the latest PID to the current queue entry. >> > Sum up the CPCs and check if the max (or min) estimate of unique counts >> > exceeds the limit for the user. >> > >> > When the CPC returns a zero estimated count then the principal has gone >> > away and the principal/CPC-queue pair can be removed from the tracking >> > system. >> > >> > I believe that this code solution is smaller and faster than the Bloom >> > filter implementation. >> > >> > [1] https://datasketches.apache.org/docs/CPC/CPC.html >> > >> > >> > >> > On Fri, Apr 12, 2024 at 3:10 PM Claude Warren <cla...@apache.org> >> wrote: >> > >> >> I think there is an issue in the KIP. >> >> >> >> Basically the kip says, if the PID is found in either of the Bloom >> filters >> >> then no action is taken >> >> If the PID is not found then it is added and the quota rating metrics >> are >> >> incremented. >> >> >> >> In this case long running PIDs will be counted multiple times. >> >> >> >> Let's assume a 30 minute window with 2 15-minute frames. So for the >> first >> >> 15 minutes all PIDs are placed in the first Bloom filter and for the >> 2nd 15 >> >> minutes all new PIDs are placed in the second bloom filter. At the >> 3rd 15 >> >> minutes the first filter is removed and a new empty one created. >> >> >> >> Let's denote Bloom filters as BFn{} and indicate the contained pids >> >> between the braces. >> >> >> >> >> >> So at t0 lets insert PID0 and increment the rating metrics. Thus we >> have >> >> BF0{PID0} >> >> at t0+5 let's insert PID1 and increment the rating metrics. Thus we >> have >> >> BF0{PID0, PID1} >> >> at t0+10 we see PID0 again but no changes occur. >> >> at t0+15 we start t1 and we have BF0{PID0, PID1}, BF1{} >> >> at t1+5 we see PID2, increment the rating metrics, and we have >> BF0{PID0, >> >> PID1}, BF1{PID2} >> >> at t1+6 we see PID0 again and no changes occur >> >> at t1+7 we see PID1 again and no changes occur >> >> at t1+15 we start a new window and dispose of BF0. Thus we have >> >> BF1{PID2}, BF2{} >> >> at t2+1 we see PID3, increment the rating metrics, and we have we have >> >> BF1{PID2}, BF2{PID3} >> >> at t2+6 we see PID0 again but now it is not in the list so we increment >> >> the rating metrics and add it BF1{PID2}, BF2{PID3, PID0} >> >> >> >> But we just saw PID0 15 minutes ago. Well within the 30 minute window >> we >> >> are trying to track. Or am I missing something? It seems like we >> need to >> >> add each PID to the last bloom filter >> >> >> >> On Fri, Apr 12, 2024 at 2:45 PM Claude Warren <cla...@apache.org> >> wrote: >> >> >> >>> Initial code is available at >> >>> >> https://github.com/Claudenw/kafka/blob/KIP-936/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerIDQuotaManager.java >> >>> >> >>> On Tue, Apr 9, 2024 at 2:37 PM Claude Warren <cla...@apache.org> >> wrote: >> >>> >> >>>> I should also note that the probability of false positives does not >> fall >> >>>> below shape.P because as it approaches shape.P a new layer is >> created and >> >>>> filters are added to that. So no layer in the LayeredBloomFilter >> exceeds >> >>>> shape.P thus the entire filter does not exceed shape.P. >> >>>> >> >>>> Claude >> >>>> >> >>>> On Tue, Apr 9, 2024 at 2:26 PM Claude Warren <cla...@apache.org> >> wrote: >> >>>> >> >>>>> The overall design for KIP-936 seems sound to me. I would make the >> >>>>> following changes: >> >>>>> >> >>>>> Replace the "TimedBloomFilter" with a "LayeredBloomFilter" from >> >>>>> commons-collections v4.5 >> >>>>> >> >>>>> Define the producer.id.quota.window.size.seconds to be the length of >> >>>>> time that a Bloom filter of PIDs will exist. >> >>>>> Define a new configuration option "producer.id.quota.window.count" >> as >> >>>>> the number of windows active in window.size.seconds. >> >>>>> >> >>>>> Define the "Shape" (See commons-collections bloomfilters v4.5) of >> the >> >>>>> bloom filter from the average number of PIDs expected in >> >>>>> window.size.seconds/window.count (call this N) and the probability >> of false >> >>>>> positives (call this P). Due to the way the LayeredBloomFilter >> works the >> >>>>> number of items can be a lower number than the max. I'll explain >> that in a >> >>>>> minute. >> >>>>> >> >>>>> The LayeredBloomFilter implements the standard BloomFilter interface >> >>>>> but internally keeps an ordered list of filters (called layers) >> from oldest >> >>>>> created to newest. It adds new layers when a specified Predicate >> >>>>> (checkExtend) returns true. It will remove filters as defined by a >> >>>>> specified Consumer (filterCleanup). >> >>>>> >> >>>>> Everytime a BloomFilter is merged into the LayeredBloomFilter the >> >>>>> filter checks to the "checkExtend" predicate. If it fires the >> >>>>> "filterCleanup" is called to remove any layers that should be >> removed and a >> >>>>> new layer is added. >> >>>>> >> >>>>> Define the layers of the LayeredBloomFilter to comprise a standard >> >>>>> BloomFilter and an associated expiration timestamp. >> >>>>> >> >>>>> We can thus define >> >>>>> >> >>>>> - "checkExtend" to require a new layer window.size.seconds / >> >>>>> window.count seconds or when the current layer contains shape.N >> items. >> >>>>> - "filterCleanup" to start at the head of the list of layers and >> >>>>> remove any expired filters, usually 0, every window.size.seconds >> 1, >> >>>>> infrequently more than 1. >> >>>>> >> >>>>> This system will correctly handle bursty loads. There are 3 cases >> to >> >>>>> consider: >> >>>>> >> >>>>> 1. If the producer is producing fewer than shape.N PIDs the layer >> >>>>> will not fill up before the next layer is added. >> >>>>> 2. If the producer is producing shape.N PIDs the layer will be >> >>>>> processed as either a 1 or a 3 depending on system timings. >> >>>>> 3. If the producer is producing more than shape.N PIDs the layer >> >>>>> will fill up and a new layer will be created with an expiration >> timestamp >> >>>>> window.size.seconds from when it was created. This is the case >> that leads >> >>>>> to the filterCleanup infrequently having more than 1 layer to >> remove. >> >>>>> >> >>>>> The last case to consider is if a producer stops generating PIDs, in >> >>>>> this case we should walk the map of producers, call >> "filterCleanup", and >> >>>>> then check to see if the LayeredBloomFilter is empty. If so, >> remove it >> >>>>> from the map. It will be empty if the producer has not produced a >> PID for >> >>>>> window.size.seconds. >> >>>>> >> >>>>> I have this solution mostly coded, though I must admit I do not know >> >>>>> where to plugin the ProducerIdQuotaManager defined in the KIP >> >>>>> >> >>>>> Claude >> >>>>> >> >>>> >> >> > > -- > LinkedIn: http://www.linkedin.com/in/claudewarren > -- LinkedIn: http://www.linkedin.com/in/claudewarren