Hi Claude, sorry for the late reply was out for some time. Thanks for your response.
> - To ensure that all produced ids are tracked for 1 hour regardless of > whether they were produced by userA or userB. Not really we need to track producer ids created by userA separately from producer ids created by userB. The primary API purpose is to limit the number of producer ids per user as the quota is set per user; for example, userA might have 100 producerIDs quota while userB might have only a quota of 50. And these quotas are dynamic configs so they can change at any time. This is why am asking how will we update the max entries for the Shape. > Do you need to keep a list for each principal? are the PID's supposed to be globally unique? If the question you are asking is has principal_1 seen pid_2 then hashing principal_1 and pid_2 together and creating a bloom filter will tell you using one LayeredBloomFilter. The PID should be unique however the problem we are trying to solve here is to throttle the owner of the vast majority of them which in this case is the principal. The main concern I have is how to update the LayedBloomFilter's max which in theory should be the value of dynamic config `quota` which is set per principal. If we used one LayedBloomFilter then every time I need to update the max entries to match the quota I'll need to replace the bloom for all principals however if they are separated like I suggested then replacing the LayedBloomFilter of max entries X with another one with max entries Y will only impact one user and not everyone. Does this make sense? On Fri, Aug 18, 2023 at 3:03 PM Claude Warren <cla...@xenei.com> wrote: > Sorry for taking so long to get back to you, somehow I missed your message. > > I am not sure how this will work when we have different producer-id-rate > > for different KafkaPrincipal as proposed in the KIP. > > For example `userA` had producer-id-rate of 1000 per hour while `user2` > has > > a quota of 100 producer ids per hour. How will we configure the max > entries > > for the Shape? > > > > I am not certain I have a full understanding of your network. However, I > am assuming that you want: > > - To ensure that all produced ids are tracked for 1 hour regardless of > whether they were produced by userA or userB. > - To use a sliding window with 1 minute resolution. > > > There is a tradeoff in the Layered Bloom filter -- larger max entries (N) > or greater depth. > > So the simplest calculation would be 1100 messages per hour / 60 minutes > per hour = 18.3, let's round that to 20. > With an N=20 if more than 20 ids are produced in a minute a second filter > will be created to accept all those over 20. > Let's assume that the first filter was created at time 0:00:00 and the > 21st id comes in at 0:00:45. When the first insert after 1:00:59 occurs > (one hour after start + window time) the first filter will be removed. > When the first insert after 1:01:44 occurs the filter created at 0:00:45 > will be removed. > > So if you have a period of high usage the number of filters (depth of the > layers) increases, as the usage decreases, the numbers go back to expected > depths. You could set the N to a much larger number and each filter would > handle more ids before an extra layer was added. However, if they are > vastly too big then there will be significant wasted space. > > The only thing that comes to my mind to maintain this desired behavior in > > the KIP is to NOT hash PID with KafkaPrincipal and keep a > > Map<KafkaPrincipal, LayeredBloomFilter> > > then each one of these bloom filters is controlled with > > `Shape(<PRODUER_ID_RATE_FOR_THIE_KafkaPrincipal>, 0.1)`. > > > > Do you need to keep a list for each principal? are the PID's supposed to > be globally unique? If the question you are asking is has principal_1 seen > pid_2 then hashing principal_1 and pid_2 together and creating a bloom > filter will tell you using one LayeredBloomFilter. If you also need to > ask: "has anybody seen pid_2?", then there are some other solutions. You > solution will work and may be appropriate in some cases where there is a > wide range of principal message rates. But in that case I would probably > still use the principal+pid solution and just split the filters by > estimated size so that all the ones that need a large filter go into one > system, and the smaller ones go into another. I do note that the hurst > calculator [1] shows that for (1000, 0.1) you need 599 bytes and 3 hash > functions. (100,0.1) you need 60 bytes and 3 hash functions, for (1100, > 0.1) you need 659 bytes and 3 hash functions. I would probably pick 704 > bytes and 3 hash functions which gives you (1176, 0.1). I would pick this > because 704 divides evenly into 64bit long blocks that are used internally > for the SimpleBloomFilter so there is no wasted space. > > Maybe am missing something here but I can't find anything in the > > `LayerManager` code that point to how often will the eviction function > > runs. Do you mean that the eviction function runs every minute? If so, > can > > we control this? > > > > The LayerManager.Builder has a setCleanup() method. That method is run > whenever a new layer is added to the filter. This means that you can use > whatever process you want to delete old filters (including none: > LayerManager.Cleanup.noCleanup()). The LayeredBloomFilterTest is an > example of advancing by time (the one minute intervals) and cleaning by > time (1 hr). It also creates a TimestampedBloomFilter to track the time. > If we need an explicit mechanism to remove filters from the LayerManager we > can probably add one. > > I hope this answers your questions. > > I am currently working on getting layered Bloom filters added to commons. > A recent change set the stage for this change so it should be in soon. > > I look forward to hearing from you, > Claude > > [1] https://hur.st/bloomfilter/?n=1000&p=0.1&m=&k= > > On Sun, Jul 16, 2023 at 1:00 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com> > wrote: > > > Thanks Claude for the feedback and the raising this implementation to > > Apache commons-collections. > > I had a look into your layered bloom filter and at first glance, I think > it > > would be a better improvement, however, regarding the following > suggestion > > > > > By hashing the principal and PID into the filter as a single hash only > > one Bloom filter is required. > > > > I am not sure how this will work when we have different producer-id-rate > > for different KafkaPrincipal as proposed in the KIP. > > For example `userA` had producer-id-rate of 1000 per hour while `user2` > has > > a quota of 100 producer ids per hour. How will we configure the max > entries > > for the Shape? > > > > The only thing that comes to my mind to maintain this desired behavior in > > the KIP is to NOT hash PID with KafkaPrincipal and keep a > > Map<KafkaPrincipal, LayeredBloomFilter> > > then each one of these bloom filters is controlled with > > `Shape(<PRODUER_ID_RATE_FOR_THIE_KafkaPrincipal>, 0.1)`. > > > > Does that make sense? WDYT? > > > > Also regarding the eviction function > > > (evict function) > > > The evict function will determine if it has been a minute since the > last > > > time we created a new layer, if so create a new layer and evict all > > layers > > > older than 1 hour. Since the layers are a time ordered list this is > > simply > > > removing the elderly layers from the front of the list. > > > > Maybe am missing something here but I can't find anything in the > > `LayerManager` code that point to how often will the eviction function > > runs. Do you mean that the eviction function runs every minute? If so, > can > > we control this? > > > > Cheers, > > Omnia > > > > On Wed, Jun 21, 2023 at 11:43 AM Claude Warren <cla...@xenei.com> wrote: > > > > > I think that the either using a Stable bloom filter or a Layered bloom > > > filter constructed as follows: > > > > > > > > > - Each layer is configured for the maximum number of principal-PID > > pairs > > > expected in a single minute. > > > - Expect 60 layers (one for each minute) > > > - If the layer becomes fully populated add another layer. > > > - When the time to insert into the current layer expires, remove the > > > layers that are older than an hour. > > > > > > This will provide a sliding window of one hour with the ability to > handle > > > bursts above the expected rate of inserts without additional false > > > positives. > > > > > > By hashing the principal and PID into the filter as a single hash only > > one > > > Bloom filter is required. > > > > > > The code I pointed to earlier uses the common-collections4 Bloom filter > > > implementation. So a rough pseudo code for the implementation is: > > > > > > Shape shap = Shape.fromNP( 1000, 0.1 ); // 1000 entries, 0.1 false > > positive > > > rate > > > LayeredBloomFilter filter = LayeredBloomFilter( shape, 60, evictFunc ); > > // > > > 60 initial layers, eviction function. > > > > > > (on PID) > > > > > > long[2] buff = Murmur3Hash.hash128x64( String.format("%s%s", principal, > > PID > > > ).getBytes(java.nio.charset.Charset.UTF8)); > > > Hasher hasher = new EnhancedDoubleHasher( buff[0], buff[1] ); > > > if (filter.contains(hasher)) { > > > // handle case where principal-pid was already seen > > > } > > > filter.merge( hasher ); // ensures that principal-pid remains in seen > for > > > the next hour. > > > > > > (evict function) > > > The evict function will determine if it has been a minute since the > last > > > time we created a new layer, if so create a new layer and evict all > > layers > > > older than 1 hour. Since the layers are a time ordered list this is > > simply > > > removing the elderly layers from the front of the list. > > > > > > if it has not been an hour and the current filter is full (e.g. has > 1000 > > > entries) create a new layer > > > > > > This should be very fast and space efficient. > > > > > > > > > On Wed, Jun 21, 2023 at 11:13 AM Claude Warren <cla...@xenei.com> > wrote: > > > > > > > I have an implementation of a layered Bloom filter in [1] (note the > > > > layered branch). This should handle the layering Bloom filter and > > allow > > > > for layers that > > > > > > > > 1. Do not become over populated and thus yield too many false > > > > positives. > > > > 2. Expire and are removed automatically. > > > > > > > > The layered Bloom filter also does not need another thread to remove > > the > > > > old filters as this is amortized across all the inserts. > > > > > > > > In addition, the number of Layered Bloom filter instances can be > > reduced > > > > by hashing the Kafka Principal and the ID together into the Bloom > > filter > > > to > > > > look for. > > > > > > > > [1] https://github.com/Claudenw/BloomFilters/tree/layered > > > > > > > > On Sun, Jun 18, 2023 at 10:18 PM Omnia Ibrahim < > > o.g.h.ibra...@gmail.com> > > > > wrote: > > > > > > > >> Hi Haruki, Thanks for having a look at the KIP. > > > >> > 1. Do you have any memory-footprint estimation for > > > >> TimeControlledBloomFilter? > > > >> I don't at the moment have any estimate as I don't have a full > > > >> implementation of this one at the moment. I can work on one if it's > > > >> required. > > > >> > > > >> > * If I read the KIP correctly, TimeControlledBloomFilter will be > > > >> > allocated per KafkaPrincipal so the size should be reasonably > small > > > >> > considering clusters which have a large number of users. > > > >> The Map stored in the cache has 2 dimensions one is vertical which > is > > > >> KafkaPrincipal (producers only) and the second horizontal which is > the > > > >> time > > > >> of the windows. > > > >> - Horizontally we will add only PIDs to the > TimeControlledBloomFilter > > > only > > > >> if KafkaPrincipal didn't hit the quota and we control the bloom > filter > > > by > > > >> time to expire the oldest set at some point when it's not needed > > > anymore. > > > >> - Vertically is the tricky one if the cluster has an insane number > of > > > >> KafkaPrincipals used for producing. And if the number of > > KafkaPrincipals > > > >> is > > > >> huge we can control the memory used by the cache by throttling more > > > >> aggressively and I would argue that they will never going to be an > > > insane > > > >> number that could cause OOM. > > > >> > > > >> >* i.e. What false-positive rate do you plan to choose as the > > default? > > > >> Am planning on using 0.1 as default. > > > >> > > > >> > 2. What do you think about rotating windows on produce-requests > > > arrival > > > >> instead of scheduler? > > > >> > * If we do rotation in scheduler threads, my concern is potential > > > >> > scheduler threads occupation which could make other background > tasks > > > to > > > >> > delay > > > >> This is a valid concern. We can consider disposing of the oldest > bloom > > > >> when > > > >> we add a new PID to the TimeControlledBloomFilter. However, I would > > > still > > > >> need a scheduler to clean up any inactive KafkaPrincipal from the > > cache > > > >> layer `i.e. ProducerIdQuotaManagerCache`. Do you have the same > concern > > > >> about this one too? > > > >> > > > >> > 3. Why the default producer.id.quota.window.size.seconds is 1 > hour? > > > >> > * Unlike other quota types (1 second) > > > >> Mostly because 1 sec doesn't make sense for this type of quota. > > > >> Misconfigured or misbehaving producers usually don't allocate new > PIDs > > > on > > > >> the leader every sec but over a period of time. > > > >> > > > >> Thanks > > > >> > > > >> On Tue, Jun 6, 2023 at 5:21 PM Haruki Okada <ocadar...@gmail.com> > > > wrote: > > > >> > > > >> > Hi, Omnia. > > > >> > > > > >> > Thanks for the KIP. > > > >> > The feature sounds indeed helpful and the strategy to use > > bloom-filter > > > >> > looks good. > > > >> > > > > >> > I have three questions: > > > >> > > > > >> > 1. Do you have any memory-footprint estimation > > > >> > for TimeControlledBloomFilter? > > > >> > * If I read the KIP correctly, TimeControlledBloomFilter will > be > > > >> > allocated per KafkaPrincipal so the size should be reasonably > small > > > >> > considering clusters which have a large number of users. > > > >> > * i.e. What false-positive rate do you plan to choose as the > > > >> default? > > > >> > 2. What do you think about rotating windows on produce-requests > > > arrival > > > >> > instead of scheduler? > > > >> > * If we do rotation in scheduler threads, my concern is > > potential > > > >> > scheduler threads occupation which could make other background > tasks > > > to > > > >> > delay > > > >> > 3. Why the default producer.id.quota.window.size.seconds is 1 > hour? > > > >> > * Unlike other quota types (1 second) > > > >> > > > > >> > Thanks, > > > >> > > > > >> > 2023年6月6日(火) 23:55 Omnia Ibrahim <o.g.h.ibra...@gmail.com>: > > > >> > > > > >> > > Hi everyone, > > > >> > > I want to start the discussion of the KIP-936 to throttle the > > number > > > >> of > > > >> > > active PIDs per KafkaPrincipal. The proposal is here > > > >> > > > > > >> > > > > > >> > > > > >> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-936%3A+Throttle+number+of+active+PIDs > > > >> > > < > > > >> > > > > > >> > > > > >> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-936%3A+Throttle+number+of+active+PIDs > > > >> > > > > > > >> > > > > > >> > > Thanks for your time and feedback. > > > >> > > Omnia > > > >> > > > > > >> > > > > >> > > > > >> > -- > > > >> > ======================== > > > >> > Okada Haruki > > > >> > ocadar...@gmail.com > > > >> > ======================== > > > >> > > > > >> > > > > > > > > > > > > -- > > > > LinkedIn: http://www.linkedin.com/in/claudewarren > > > > > > > > > > > > > -- > > > LinkedIn: http://www.linkedin.com/in/claudewarren > > > > > > > > -- > LinkedIn: http://www.linkedin.com/in/claudewarren >