I misspoke before the LayedBloomFilterTest.testExpiration() uses milliseconds to expire the data but it layout an example of how to expire filters in time intervals.
On Fri, Aug 18, 2023 at 4:01 PM Claude Warren <cla...@xenei.com> wrote: > Sorry for taking so long to get back to you, somehow I missed your message. > > I am not sure how this will work when we have different producer-id-rate >> for different KafkaPrincipal as proposed in the KIP. >> For example `userA` had producer-id-rate of 1000 per hour while `user2` >> has >> a quota of 100 producer ids per hour. How will we configure the max >> entries >> for the Shape? >> > > I am not certain I have a full understanding of your network. However, I > am assuming that you want: > > - To ensure that all produced ids are tracked for 1 hour regardless of > whether they were produced by userA or userB. > - To use a sliding window with 1 minute resolution. > > > There is a tradeoff in the Layered Bloom filter -- larger max entries (N) > or greater depth. > > So the simplest calculation would be 1100 messages per hour / 60 minutes > per hour = 18.3, let's round that to 20. > With an N=20 if more than 20 ids are produced in a minute a second filter > will be created to accept all those over 20. > Let's assume that the first filter was created at time 0:00:00 and the > 21st id comes in at 0:00:45. When the first insert after 1:00:59 occurs > (one hour after start + window time) the first filter will be removed. > When the first insert after 1:01:44 occurs the filter created at 0:00:45 > will be removed. > > So if you have a period of high usage the number of filters (depth of the > layers) increases, as the usage decreases, the numbers go back to expected > depths. You could set the N to a much larger number and each filter would > handle more ids before an extra layer was added. However, if they are > vastly too big then there will be significant wasted space. > > The only thing that comes to my mind to maintain this desired behavior in >> the KIP is to NOT hash PID with KafkaPrincipal and keep a >> Map<KafkaPrincipal, LayeredBloomFilter> >> then each one of these bloom filters is controlled with >> `Shape(<PRODUER_ID_RATE_FOR_THIE_KafkaPrincipal>, 0.1)`. >> > > Do you need to keep a list for each principal? are the PID's supposed to > be globally unique? If the question you are asking is has principal_1 seen > pid_2 then hashing principal_1 and pid_2 together and creating a bloom > filter will tell you using one LayeredBloomFilter. If you also need to > ask: "has anybody seen pid_2?", then there are some other solutions. You > solution will work and may be appropriate in some cases where there is a > wide range of principal message rates. But in that case I would probably > still use the principal+pid solution and just split the filters by > estimated size so that all the ones that need a large filter go into one > system, and the smaller ones go into another. I do note that the hurst > calculator [1] shows that for (1000, 0.1) you need 599 bytes and 3 hash > functions. (100,0.1) you need 60 bytes and 3 hash functions, for (1100, > 0.1) you need 659 bytes and 3 hash functions. I would probably pick 704 > bytes and 3 hash functions which gives you (1176, 0.1). I would pick this > because 704 divides evenly into 64bit long blocks that are used internally > for the SimpleBloomFilter so there is no wasted space. > > Maybe am missing something here but I can't find anything in the >> `LayerManager` code that point to how often will the eviction function >> runs. Do you mean that the eviction function runs every minute? If so, can >> we control this? >> > > The LayerManager.Builder has a setCleanup() method. That method is run > whenever a new layer is added to the filter. This means that you can use > whatever process you want to delete old filters (including none: > LayerManager.Cleanup.noCleanup()). The LayeredBloomFilterTest is an > example of advancing by time (the one minute intervals) and cleaning by > time (1 hr). It also creates a TimestampedBloomFilter to track the time. > If we need an explicit mechanism to remove filters from the LayerManager we > can probably add one. > > I hope this answers your questions. > > I am currently working on getting layered Bloom filters added to commons. > A recent change set the stage for this change so it should be in soon. > > I look forward to hearing from you, > Claude > > [1] https://hur.st/bloomfilter/?n=1000&p=0.1&m=&k= > > On Sun, Jul 16, 2023 at 1:00 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com> > wrote: > >> Thanks Claude for the feedback and the raising this implementation to >> Apache commons-collections. >> I had a look into your layered bloom filter and at first glance, I think >> it >> would be a better improvement, however, regarding the following suggestion >> >> > By hashing the principal and PID into the filter as a single hash only >> one Bloom filter is required. >> >> I am not sure how this will work when we have different producer-id-rate >> for different KafkaPrincipal as proposed in the KIP. >> For example `userA` had producer-id-rate of 1000 per hour while `user2` >> has >> a quota of 100 producer ids per hour. How will we configure the max >> entries >> for the Shape? >> >> The only thing that comes to my mind to maintain this desired behavior in >> the KIP is to NOT hash PID with KafkaPrincipal and keep a >> Map<KafkaPrincipal, LayeredBloomFilter> >> then each one of these bloom filters is controlled with >> `Shape(<PRODUER_ID_RATE_FOR_THIE_KafkaPrincipal>, 0.1)`. >> >> Does that make sense? WDYT? >> >> Also regarding the eviction function >> > (evict function) >> > The evict function will determine if it has been a minute since the last >> > time we created a new layer, if so create a new layer and evict all >> layers >> > older than 1 hour. Since the layers are a time ordered list this is >> simply >> > removing the elderly layers from the front of the list. >> >> Maybe am missing something here but I can't find anything in the >> `LayerManager` code that point to how often will the eviction function >> runs. Do you mean that the eviction function runs every minute? If so, can >> we control this? >> >> Cheers, >> Omnia >> >> On Wed, Jun 21, 2023 at 11:43 AM Claude Warren <cla...@xenei.com> wrote: >> >> > I think that the either using a Stable bloom filter or a Layered bloom >> > filter constructed as follows: >> > >> > >> > - Each layer is configured for the maximum number of principal-PID >> pairs >> > expected in a single minute. >> > - Expect 60 layers (one for each minute) >> > - If the layer becomes fully populated add another layer. >> > - When the time to insert into the current layer expires, remove the >> > layers that are older than an hour. >> > >> > This will provide a sliding window of one hour with the ability to >> handle >> > bursts above the expected rate of inserts without additional false >> > positives. >> > >> > By hashing the principal and PID into the filter as a single hash only >> one >> > Bloom filter is required. >> > >> > The code I pointed to earlier uses the common-collections4 Bloom filter >> > implementation. So a rough pseudo code for the implementation is: >> > >> > Shape shap = Shape.fromNP( 1000, 0.1 ); // 1000 entries, 0.1 false >> positive >> > rate >> > LayeredBloomFilter filter = LayeredBloomFilter( shape, 60, evictFunc ); >> // >> > 60 initial layers, eviction function. >> > >> > (on PID) >> > >> > long[2] buff = Murmur3Hash.hash128x64( String.format("%s%s", principal, >> PID >> > ).getBytes(java.nio.charset.Charset.UTF8)); >> > Hasher hasher = new EnhancedDoubleHasher( buff[0], buff[1] ); >> > if (filter.contains(hasher)) { >> > // handle case where principal-pid was already seen >> > } >> > filter.merge( hasher ); // ensures that principal-pid remains in seen >> for >> > the next hour. >> > >> > (evict function) >> > The evict function will determine if it has been a minute since the last >> > time we created a new layer, if so create a new layer and evict all >> layers >> > older than 1 hour. Since the layers are a time ordered list this is >> simply >> > removing the elderly layers from the front of the list. >> > >> > if it has not been an hour and the current filter is full (e.g. has 1000 >> > entries) create a new layer >> > >> > This should be very fast and space efficient. >> > >> > >> > On Wed, Jun 21, 2023 at 11:13 AM Claude Warren <cla...@xenei.com> >> wrote: >> > >> > > I have an implementation of a layered Bloom filter in [1] (note the >> > > layered branch). This should handle the layering Bloom filter and >> allow >> > > for layers that >> > > >> > > 1. Do not become over populated and thus yield too many false >> > > positives. >> > > 2. Expire and are removed automatically. >> > > >> > > The layered Bloom filter also does not need another thread to remove >> the >> > > old filters as this is amortized across all the inserts. >> > > >> > > In addition, the number of Layered Bloom filter instances can be >> reduced >> > > by hashing the Kafka Principal and the ID together into the Bloom >> filter >> > to >> > > look for. >> > > >> > > [1] https://github.com/Claudenw/BloomFilters/tree/layered >> > > >> > > On Sun, Jun 18, 2023 at 10:18 PM Omnia Ibrahim < >> o.g.h.ibra...@gmail.com> >> > > wrote: >> > > >> > >> Hi Haruki, Thanks for having a look at the KIP. >> > >> > 1. Do you have any memory-footprint estimation for >> > >> TimeControlledBloomFilter? >> > >> I don't at the moment have any estimate as I don't have a full >> > >> implementation of this one at the moment. I can work on one if it's >> > >> required. >> > >> >> > >> > * If I read the KIP correctly, TimeControlledBloomFilter will be >> > >> > allocated per KafkaPrincipal so the size should be reasonably small >> > >> > considering clusters which have a large number of users. >> > >> The Map stored in the cache has 2 dimensions one is vertical which is >> > >> KafkaPrincipal (producers only) and the second horizontal which is >> the >> > >> time >> > >> of the windows. >> > >> - Horizontally we will add only PIDs to the TimeControlledBloomFilter >> > only >> > >> if KafkaPrincipal didn't hit the quota and we control the bloom >> filter >> > by >> > >> time to expire the oldest set at some point when it's not needed >> > anymore. >> > >> - Vertically is the tricky one if the cluster has an insane number of >> > >> KafkaPrincipals used for producing. And if the number of >> KafkaPrincipals >> > >> is >> > >> huge we can control the memory used by the cache by throttling more >> > >> aggressively and I would argue that they will never going to be an >> > insane >> > >> number that could cause OOM. >> > >> >> > >> >* i.e. What false-positive rate do you plan to choose as the >> default? >> > >> Am planning on using 0.1 as default. >> > >> >> > >> > 2. What do you think about rotating windows on produce-requests >> > arrival >> > >> instead of scheduler? >> > >> > * If we do rotation in scheduler threads, my concern is potential >> > >> > scheduler threads occupation which could make other background >> tasks >> > to >> > >> > delay >> > >> This is a valid concern. We can consider disposing of the oldest >> bloom >> > >> when >> > >> we add a new PID to the TimeControlledBloomFilter. However, I would >> > still >> > >> need a scheduler to clean up any inactive KafkaPrincipal from the >> cache >> > >> layer `i.e. ProducerIdQuotaManagerCache`. Do you have the same >> concern >> > >> about this one too? >> > >> >> > >> > 3. Why the default producer.id.quota.window.size.seconds is 1 hour? >> > >> > * Unlike other quota types (1 second) >> > >> Mostly because 1 sec doesn't make sense for this type of quota. >> > >> Misconfigured or misbehaving producers usually don't allocate new >> PIDs >> > on >> > >> the leader every sec but over a period of time. >> > >> >> > >> Thanks >> > >> >> > >> On Tue, Jun 6, 2023 at 5:21 PM Haruki Okada <ocadar...@gmail.com> >> > wrote: >> > >> >> > >> > Hi, Omnia. >> > >> > >> > >> > Thanks for the KIP. >> > >> > The feature sounds indeed helpful and the strategy to use >> bloom-filter >> > >> > looks good. >> > >> > >> > >> > I have three questions: >> > >> > >> > >> > 1. Do you have any memory-footprint estimation >> > >> > for TimeControlledBloomFilter? >> > >> > * If I read the KIP correctly, TimeControlledBloomFilter will >> be >> > >> > allocated per KafkaPrincipal so the size should be reasonably small >> > >> > considering clusters which have a large number of users. >> > >> > * i.e. What false-positive rate do you plan to choose as the >> > >> default? >> > >> > 2. What do you think about rotating windows on produce-requests >> > arrival >> > >> > instead of scheduler? >> > >> > * If we do rotation in scheduler threads, my concern is >> potential >> > >> > scheduler threads occupation which could make other background >> tasks >> > to >> > >> > delay >> > >> > 3. Why the default producer.id.quota.window.size.seconds is 1 hour? >> > >> > * Unlike other quota types (1 second) >> > >> > >> > >> > Thanks, >> > >> > >> > >> > 2023年6月6日(火) 23:55 Omnia Ibrahim <o.g.h.ibra...@gmail.com>: >> > >> > >> > >> > > Hi everyone, >> > >> > > I want to start the discussion of the KIP-936 to throttle the >> number >> > >> of >> > >> > > active PIDs per KafkaPrincipal. The proposal is here >> > >> > > >> > >> > > >> > >> > >> > >> >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-936%3A+Throttle+number+of+active+PIDs >> > >> > > < >> > >> > > >> > >> > >> > >> >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-936%3A+Throttle+number+of+active+PIDs >> > >> > > > >> > >> > > >> > >> > > Thanks for your time and feedback. >> > >> > > Omnia >> > >> > > >> > >> > >> > >> > >> > >> > -- >> > >> > ======================== >> > >> > Okada Haruki >> > >> > ocadar...@gmail.com >> > >> > ======================== >> > >> > >> > >> >> > > >> > > >> > > -- >> > > LinkedIn: http://www.linkedin.com/in/claudewarren >> > > >> > >> > >> > -- >> > LinkedIn: http://www.linkedin.com/in/claudewarren >> > >> > > > -- > LinkedIn: http://www.linkedin.com/in/claudewarren > -- LinkedIn: http://www.linkedin.com/in/claudewarren