I think that the either using a Stable bloom filter or a Layered bloom filter constructed as follows:
- Each layer is configured for the maximum number of principal-PID pairs expected in a single minute. - Expect 60 layers (one for each minute) - If the layer becomes fully populated add another layer. - When the time to insert into the current layer expires, remove the layers that are older than an hour. This will provide a sliding window of one hour with the ability to handle bursts above the expected rate of inserts without additional false positives. By hashing the principal and PID into the filter as a single hash only one Bloom filter is required. The code I pointed to earlier uses the common-collections4 Bloom filter implementation. So a rough pseudo code for the implementation is: Shape shap = Shape.fromNP( 1000, 0.1 ); // 1000 entries, 0.1 false positive rate LayeredBloomFilter filter = LayeredBloomFilter( shape, 60, evictFunc ); // 60 initial layers, eviction function. (on PID) long[2] buff = Murmur3Hash.hash128x64( String.format("%s%s", principal, PID ).getBytes(java.nio.charset.Charset.UTF8)); Hasher hasher = new EnhancedDoubleHasher( buff[0], buff[1] ); if (filter.contains(hasher)) { // handle case where principal-pid was already seen } filter.merge( hasher ); // ensures that principal-pid remains in seen for the next hour. (evict function) The evict function will determine if it has been a minute since the last time we created a new layer, if so create a new layer and evict all layers older than 1 hour. Since the layers are a time ordered list this is simply removing the elderly layers from the front of the list. if it has not been an hour and the current filter is full (e.g. has 1000 entries) create a new layer This should be very fast and space efficient. On Wed, Jun 21, 2023 at 11:13 AM Claude Warren <cla...@xenei.com> wrote: > I have an implementation of a layered Bloom filter in [1] (note the > layered branch). This should handle the layering Bloom filter and allow > for layers that > > 1. Do not become over populated and thus yield too many false > positives. > 2. Expire and are removed automatically. > > The layered Bloom filter also does not need another thread to remove the > old filters as this is amortized across all the inserts. > > In addition, the number of Layered Bloom filter instances can be reduced > by hashing the Kafka Principal and the ID together into the Bloom filter to > look for. > > [1] https://github.com/Claudenw/BloomFilters/tree/layered > > On Sun, Jun 18, 2023 at 10:18 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com> > wrote: > >> Hi Haruki, Thanks for having a look at the KIP. >> > 1. Do you have any memory-footprint estimation for >> TimeControlledBloomFilter? >> I don't at the moment have any estimate as I don't have a full >> implementation of this one at the moment. I can work on one if it's >> required. >> >> > * If I read the KIP correctly, TimeControlledBloomFilter will be >> > allocated per KafkaPrincipal so the size should be reasonably small >> > considering clusters which have a large number of users. >> The Map stored in the cache has 2 dimensions one is vertical which is >> KafkaPrincipal (producers only) and the second horizontal which is the >> time >> of the windows. >> - Horizontally we will add only PIDs to the TimeControlledBloomFilter only >> if KafkaPrincipal didn't hit the quota and we control the bloom filter by >> time to expire the oldest set at some point when it's not needed anymore. >> - Vertically is the tricky one if the cluster has an insane number of >> KafkaPrincipals used for producing. And if the number of KafkaPrincipals >> is >> huge we can control the memory used by the cache by throttling more >> aggressively and I would argue that they will never going to be an insane >> number that could cause OOM. >> >> >* i.e. What false-positive rate do you plan to choose as the default? >> Am planning on using 0.1 as default. >> >> > 2. What do you think about rotating windows on produce-requests arrival >> instead of scheduler? >> > * If we do rotation in scheduler threads, my concern is potential >> > scheduler threads occupation which could make other background tasks to >> > delay >> This is a valid concern. We can consider disposing of the oldest bloom >> when >> we add a new PID to the TimeControlledBloomFilter. However, I would still >> need a scheduler to clean up any inactive KafkaPrincipal from the cache >> layer `i.e. ProducerIdQuotaManagerCache`. Do you have the same concern >> about this one too? >> >> > 3. Why the default producer.id.quota.window.size.seconds is 1 hour? >> > * Unlike other quota types (1 second) >> Mostly because 1 sec doesn't make sense for this type of quota. >> Misconfigured or misbehaving producers usually don't allocate new PIDs on >> the leader every sec but over a period of time. >> >> Thanks >> >> On Tue, Jun 6, 2023 at 5:21 PM Haruki Okada <ocadar...@gmail.com> wrote: >> >> > Hi, Omnia. >> > >> > Thanks for the KIP. >> > The feature sounds indeed helpful and the strategy to use bloom-filter >> > looks good. >> > >> > I have three questions: >> > >> > 1. Do you have any memory-footprint estimation >> > for TimeControlledBloomFilter? >> > * If I read the KIP correctly, TimeControlledBloomFilter will be >> > allocated per KafkaPrincipal so the size should be reasonably small >> > considering clusters which have a large number of users. >> > * i.e. What false-positive rate do you plan to choose as the >> default? >> > 2. What do you think about rotating windows on produce-requests arrival >> > instead of scheduler? >> > * If we do rotation in scheduler threads, my concern is potential >> > scheduler threads occupation which could make other background tasks to >> > delay >> > 3. Why the default producer.id.quota.window.size.seconds is 1 hour? >> > * Unlike other quota types (1 second) >> > >> > Thanks, >> > >> > 2023年6月6日(火) 23:55 Omnia Ibrahim <o.g.h.ibra...@gmail.com>: >> > >> > > Hi everyone, >> > > I want to start the discussion of the KIP-936 to throttle the number >> of >> > > active PIDs per KafkaPrincipal. The proposal is here >> > > >> > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-936%3A+Throttle+number+of+active+PIDs >> > > < >> > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-936%3A+Throttle+number+of+active+PIDs >> > > > >> > > >> > > Thanks for your time and feedback. >> > > Omnia >> > > >> > >> > >> > -- >> > ======================== >> > Okada Haruki >> > ocadar...@gmail.com >> > ======================== >> > >> > > > -- > LinkedIn: http://www.linkedin.com/in/claudewarren > -- LinkedIn: http://www.linkedin.com/in/claudewarren