I misspoke before the LayedBloomFilterTest.testExpiration() uses
milliseconds to expire the data but it layout an example of how to expire
filters in time intervals.

On Fri, Aug 18, 2023 at 4:01 PM Claude Warren <cla...@xenei.com> wrote:

> Sorry for taking so long to get back to you, somehow I missed your message.
>
> I am not sure how this will work when we have different producer-id-rate
>> for different KafkaPrincipal as proposed in the KIP.
>> For example `userA` had producer-id-rate of 1000 per hour while `user2`
>> has
>> a quota of 100 producer ids per hour. How will we configure the max
>> entries
>> for the Shape?
>>
>
> I am not certain I have a full understanding of your network.  However, I
> am assuming that you want:
>
>    - To ensure that all produced ids are tracked for 1 hour regardless of
>    whether they were produced by userA or userB.
>    - To use a sliding window with 1 minute resolution.
>
>
> There is a tradeoff in the Layered Bloom filter -- larger max entries (N)
> or greater depth.
>
> So the simplest calculation would be 1100 messages per hour / 60 minutes
> per hour = 18.3, let's round that to 20.
> With an N=20 if more than 20 ids are produced in a minute a second filter
> will be created to accept all those over 20.
> Let's assume that the first filter was created at time 0:00:00  and the
> 21st id comes in at 0:00:45.  When the first insert after 1:00:59 occurs
> (one hour after start + window time) the first filter will be removed.
> When the first insert after 1:01:44 occurs the filter created at 0:00:45
> will be removed.
>
> So if you have a period of high usage the number of filters (depth of the
> layers) increases, as the usage decreases, the numbers go back to expected
> depths.  You could set the N to a much larger number and each filter would
> handle more ids before an extra layer was added.  However, if they are
> vastly too big then there will be significant wasted space.
>
> The only thing that comes to my mind to maintain this desired behavior in
>> the KIP is to NOT hash PID with KafkaPrincipal and keep a
>> Map<KafkaPrincipal, LayeredBloomFilter>
>> then each one of these bloom filters is controlled with
>> `Shape(<PRODUER_ID_RATE_FOR_THIE_KafkaPrincipal>, 0.1)`.
>>
>
> Do you need to keep a list for each principal?  are the PID's supposed to
> be globally unique?  If the question you are asking is has principal_1 seen
> pid_2 then hashing principal_1 and pid_2 together and creating a bloom
> filter will tell you using one LayeredBloomFilter.  If you also need to
> ask: "has anybody seen pid_2?", then there are some other solutions.   You
> solution will work and may be appropriate in some cases where there is a
> wide range of principal message rates.  But in that case I would probably
> still use the principal+pid solution and just split the filters by
> estimated size so that all the ones that need a large filter go into one
> system, and the smaller ones go into another. I do note that the hurst
> calculator [1] shows that for (1000, 0.1) you need  599 bytes and 3 hash
> functions.  (100,0.1) you need 60 bytes and 3 hash functions, for (1100,
> 0.1) you need 659 bytes and 3 hash functions.  I would probably pick 704
> bytes and 3 hash functions which gives you (1176, 0.1).  I would pick this
> because 704 divides evenly into 64bit long blocks that are used internally
> for the SimpleBloomFilter so there is no wasted space.
>
> Maybe am missing something here but I can't find anything in the
>> `LayerManager` code that point to how often will the eviction function
>> runs. Do you mean that the eviction function runs every minute? If so, can
>> we control this?
>>
>
> The LayerManager.Builder has a setCleanup() method.  That method is run
> whenever a new layer is added to the filter.  This means that you can use
> whatever process you want to delete old filters (including none:
> LayerManager.Cleanup.noCleanup()).  The LayeredBloomFilterTest is an
> example of advancing by time (the one minute intervals) and cleaning by
> time (1 hr).  It also creates a TimestampedBloomFilter to track the time.
> If we need an explicit mechanism to remove filters from the LayerManager we
> can probably add one.
>
> I hope this answers your questions.
>
> I am currently working on getting layered Bloom filters added to commons.
> A recent change set the stage for this change so it should be in soon.
>
> I look forward to hearing from you,
> Claude
>
> [1] https://hur.st/bloomfilter/?n=1000&p=0.1&m=&k=
>
> On Sun, Jul 16, 2023 at 1:00 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com>
> wrote:
>
>> Thanks Claude for the feedback and the raising this implementation to
>> Apache commons-collections.
>> I had a look into your layered bloom filter and at first glance, I think
>> it
>> would be a better improvement, however, regarding the following suggestion
>>
>> > By hashing the principal and PID into the filter as a single hash only
>> one Bloom filter is required.
>>
>> I am not sure how this will work when we have different producer-id-rate
>> for different KafkaPrincipal as proposed in the KIP.
>> For example `userA` had producer-id-rate of 1000 per hour while `user2`
>> has
>> a quota of 100 producer ids per hour. How will we configure the max
>> entries
>> for the Shape?
>>
>> The only thing that comes to my mind to maintain this desired behavior in
>> the KIP is to NOT hash PID with KafkaPrincipal and keep a
>> Map<KafkaPrincipal, LayeredBloomFilter>
>> then each one of these bloom filters is controlled with
>> `Shape(<PRODUER_ID_RATE_FOR_THIE_KafkaPrincipal>, 0.1)`.
>>
>> Does that make sense? WDYT?
>>
>> Also regarding the eviction function
>> > (evict function)
>> > The evict function will determine if it has been a minute since the last
>> > time we created a new layer, if so create a new layer and evict all
>> layers
>> > older than 1 hour.  Since the layers are a time ordered list this is
>> simply
>> > removing the elderly layers from the front of the list.
>>
>> Maybe am missing something here but I can't find anything in the
>> `LayerManager` code that point to how often will the eviction function
>> runs. Do you mean that the eviction function runs every minute? If so, can
>> we control this?
>>
>> Cheers,
>> Omnia
>>
>> On Wed, Jun 21, 2023 at 11:43 AM Claude Warren <cla...@xenei.com> wrote:
>>
>> > I think that the either using a Stable bloom filter or a Layered bloom
>> > filter constructed as follows:
>> >
>> >
>> >    - Each layer is configured for the maximum number of principal-PID
>> pairs
>> >    expected in a single minute.
>> >    - Expect 60 layers (one for each minute)
>> >    - If the layer becomes fully populated add another layer.
>> >    - When the time to insert into the current layer expires, remove the
>> >    layers that are older than an hour.
>> >
>> > This will provide a sliding window of one hour with the ability to
>> handle
>> > bursts above the expected rate of inserts without additional false
>> > positives.
>> >
>> > By hashing the principal and PID into the filter as a single hash only
>> one
>> > Bloom filter is required.
>> >
>> > The code I pointed to earlier uses the common-collections4 Bloom filter
>> > implementation.  So a rough pseudo code for the implementation is:
>> >
>> > Shape shap = Shape.fromNP( 1000, 0.1 ); // 1000 entries, 0.1 false
>> positive
>> > rate
>> > LayeredBloomFilter filter = LayeredBloomFilter( shape, 60, evictFunc );
>> //
>> > 60 initial layers, eviction function.
>> >
>> > (on PID)
>> >
>> > long[2] buff = Murmur3Hash.hash128x64( String.format("%s%s", principal,
>> PID
>> > ).getBytes(java.nio.charset.Charset.UTF8));
>> > Hasher hasher = new EnhancedDoubleHasher( buff[0], buff[1] );
>> > if (filter.contains(hasher)) {
>> >     // handle case where principal-pid was already seen
>> > }
>> > filter.merge( hasher ); // ensures that principal-pid remains in seen
>> for
>> > the next hour.
>> >
>> > (evict function)
>> > The evict function will determine if it has been a minute since the last
>> > time we created a new layer, if so create a new layer and evict all
>> layers
>> > older than 1 hour.  Since the layers are a time ordered list this is
>> simply
>> > removing the elderly layers from the front of the list.
>> >
>> > if it has not been an hour and the current filter is full (e.g. has 1000
>> > entries) create a new layer
>> >
>> > This should be very fast and space efficient.
>> >
>> >
>> > On Wed, Jun 21, 2023 at 11:13 AM Claude Warren <cla...@xenei.com>
>> wrote:
>> >
>> > > I have an implementation of a layered Bloom filter in [1] (note the
>> > > layered branch).  This should handle the layering Bloom filter and
>> allow
>> > > for layers that
>> > >
>> > >    1. Do not become over populated and thus yield too many false
>> > >    positives.
>> > >    2. Expire and are removed automatically.
>> > >
>> > > The layered Bloom filter also does not need another thread to remove
>> the
>> > > old filters as this is amortized across all the inserts.
>> > >
>> > > In addition, the number of Layered Bloom filter instances can be
>> reduced
>> > > by hashing the Kafka Principal and the ID together into the Bloom
>> filter
>> > to
>> > > look for.
>> > >
>> > > [1] https://github.com/Claudenw/BloomFilters/tree/layered
>> > >
>> > > On Sun, Jun 18, 2023 at 10:18 PM Omnia Ibrahim <
>> o.g.h.ibra...@gmail.com>
>> > > wrote:
>> > >
>> > >> Hi Haruki, Thanks for having a look at the KIP.
>> > >> > 1. Do you have any memory-footprint estimation for
>> > >> TimeControlledBloomFilter?
>> > >> I don't at the moment have any estimate as I don't have a full
>> > >> implementation of this one at the moment. I can work on one if it's
>> > >> required.
>> > >>
>> > >> > * If I read the KIP correctly, TimeControlledBloomFilter will be
>> > >> > allocated per KafkaPrincipal so the size should be reasonably small
>> > >> > considering clusters which have a large number of users.
>> > >> The Map stored in the cache has 2 dimensions one is vertical which is
>> > >> KafkaPrincipal (producers only) and the second horizontal which is
>> the
>> > >> time
>> > >> of the windows.
>> > >> - Horizontally we will add only PIDs to the TimeControlledBloomFilter
>> > only
>> > >> if KafkaPrincipal didn't hit the quota and we control the bloom
>> filter
>> > by
>> > >> time to expire the oldest set at some point when it's not needed
>> > anymore.
>> > >> - Vertically is the tricky one if the cluster has an insane number of
>> > >> KafkaPrincipals used for producing. And if the number of
>> KafkaPrincipals
>> > >> is
>> > >> huge we can control the memory used by the cache by throttling more
>> > >> aggressively and I would argue that they will never going to be an
>> > insane
>> > >> number that could cause OOM.
>> > >>
>> > >>  >* i.e. What false-positive rate do you plan to choose as the
>> default?
>> > >> Am planning on using 0.1 as default.
>> > >>
>> > >> > 2. What do you think about rotating windows on produce-requests
>> > arrival
>> > >> instead of scheduler?
>> > >> > * If we do rotation in scheduler threads, my concern is potential
>> > >> > scheduler threads occupation which could make other background
>> tasks
>> > to
>> > >> > delay
>> > >> This is a valid concern. We can consider disposing of the oldest
>> bloom
>> > >> when
>> > >> we add a new PID to the TimeControlledBloomFilter. However, I would
>> > still
>> > >> need a scheduler to clean up any inactive KafkaPrincipal from the
>> cache
>> > >> layer `i.e. ProducerIdQuotaManagerCache`. Do you have the same
>> concern
>> > >> about this one too?
>> > >>
>> > >> > 3. Why the default producer.id.quota.window.size.seconds is 1 hour?
>> > >> >  * Unlike other quota types (1 second)
>> > >> Mostly because 1 sec doesn't make sense for this type of quota.
>> > >> Misconfigured or misbehaving producers usually don't allocate new
>> PIDs
>> > on
>> > >> the leader every sec but over a period of time.
>> > >>
>> > >> Thanks
>> > >>
>> > >> On Tue, Jun 6, 2023 at 5:21 PM Haruki Okada <ocadar...@gmail.com>
>> > wrote:
>> > >>
>> > >> > Hi, Omnia.
>> > >> >
>> > >> > Thanks for the KIP.
>> > >> > The feature sounds indeed helpful and the strategy to use
>> bloom-filter
>> > >> > looks good.
>> > >> >
>> > >> > I have three questions:
>> > >> >
>> > >> > 1. Do you have any memory-footprint estimation
>> > >> > for TimeControlledBloomFilter?
>> > >> >     * If I read the KIP correctly, TimeControlledBloomFilter will
>> be
>> > >> > allocated per KafkaPrincipal so the size should be reasonably small
>> > >> > considering clusters which have a large number of users.
>> > >> >     * i.e. What false-positive rate do you plan to choose as the
>> > >> default?
>> > >> > 2. What do you think about rotating windows on produce-requests
>> > arrival
>> > >> > instead of scheduler?
>> > >> >     * If we do rotation in scheduler threads, my concern is
>> potential
>> > >> > scheduler threads occupation which could make other background
>> tasks
>> > to
>> > >> > delay
>> > >> > 3. Why the default producer.id.quota.window.size.seconds is 1 hour?
>> > >> >     * Unlike other quota types (1 second)
>> > >> >
>> > >> > Thanks,
>> > >> >
>> > >> > 2023年6月6日(火) 23:55 Omnia Ibrahim <o.g.h.ibra...@gmail.com>:
>> > >> >
>> > >> > > Hi everyone,
>> > >> > > I want to start the discussion of the KIP-936 to throttle the
>> number
>> > >> of
>> > >> > > active PIDs per KafkaPrincipal. The proposal is here
>> > >> > >
>> > >> > >
>> > >> >
>> > >>
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-936%3A+Throttle+number+of+active+PIDs
>> > >> > > <
>> > >> > >
>> > >> >
>> > >>
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-936%3A+Throttle+number+of+active+PIDs
>> > >> > > >
>> > >> > >
>> > >> > > Thanks for your time and feedback.
>> > >> > > Omnia
>> > >> > >
>> > >> >
>> > >> >
>> > >> > --
>> > >> > ========================
>> > >> > Okada Haruki
>> > >> > ocadar...@gmail.com
>> > >> > ========================
>> > >> >
>> > >>
>> > >
>> > >
>> > > --
>> > > LinkedIn: http://www.linkedin.com/in/claudewarren
>> > >
>> >
>> >
>> > --
>> > LinkedIn: http://www.linkedin.com/in/claudewarren
>> >
>>
>
>
> --
> LinkedIn: http://www.linkedin.com/in/claudewarren
>


-- 
LinkedIn: http://www.linkedin.com/in/claudewarren

Reply via email to