apovzner commented on a change in pull request #8977:
URL: https://github.com/apache/kafka/pull/8977#discussion_r454148540



##########
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.MetricConfig;
+
+import java.util.List;
+
+/**
+ * A {@link SampledStat} that mimics the behavior of a Token Bucket and is 
meant to
+ * be used in conjunction with a {@link Rate} and a {@link 
org.apache.kafka.common.metrics.Quota}.
+ *
+ * The {@link TokenBucket} considers each sample as the amount of credits 
spent during the sample's
+ * window while giving back credits based on the defined quota.
+ *
+ * At time T, it computes the total O as follow:
+ * - O(T) = max(0, O(T-1) - Q * (W(T) - W(T-1)) + S(T)
+ * Where:
+ * - Q is the defined Quota or 0 if undefined
+ * - W is the time of the sample or now if undefined
+ * - S is the value of the sample or 0 if undefined
+ *
+ * Example with 3 samples with a Quota = 2:
+ * - S1 at T+0s => 4
+ * - S2 at T+2s => 2
+ * - S3 at T+4s => 6
+ *
+ * The total at T+6s is computed as follow:
+ * - T0 => Total at T+0s => S1 = 4
+ * - T1 => Total at T+2s => max(0, T0 - Q * dT) + S2 = (4 - 2 * 2) + 2 = 2
+ * - T2 => Total at T+4s => max(0, T1 - Q * dT) + S3 = (2 - 2 * 2) + 6 = 6
+ * - T3 => Total at T+6s => max(0, T2 - Q * dT) = (6 - 2 * 2) = 2
+ */
+public class TokenBucket extends SampledStat {
+
+    private final TimeUnit unit;
+
+    public TokenBucket() {
+        this(TimeUnit.SECONDS);
+    }
+
+    public TokenBucket(TimeUnit unit) {
+        super(0);
+        this.unit = unit;
+    }
+
+    @Override
+    protected void update(Sample sample, MetricConfig config, double value, 
long now) {
+        sample.value += value;
+    }
+
+    @Override

Review comment:
       Let me respond to the "backfilling" algorithm part first and I still 
need to think about the "combining samples" algorithm. The term backfilling is 
probably misleading for the algorithm I described, and it is different to the 
one you described.
   
   The algorithm that you described would record a value into a sample 
corresponding to a current time, even if older samples have unused credits 
(quota - sample.value). I am assuming 1-second samples. This breaks token 
bucket properties because it enables the situation where we could expire 
credits that should have been used while we used "newer credits" (where credits 
being "newer" or "older" is an artifact of us using samples, and so we also 
need to be careful to use oldest credits first if we are implementing token 
bucket). Example (6 samples, quota = 5 → corresponds to max Burst = 30):
   Record 2 at time T
   Record 3 at time T + 3
   Record 30 at time T + 6. At time T + 6, we would first expire the oldest 
sample, and here we are expiring 3 unused credits that could have been used 
when we recorded a value at time T + 3. 
   
   I wanted to take one step back to get on the same page, because we are 
implementing the token bucket somewhat backwards (because of existing code) and 
this makes it much more confusing relative to how simple the token bucket 
algorithm is.
   
   A token bucket accumulates tokens with a constant rate R, one token per 1/R 
seconds, up to a maximum burst size B. Lets say, 1 token gives a permission to 
do 1 unit of work. Without the burst size, we would have to do a unit of work 
within 1/R seconds (until the next token gets created). The burst size 
essentially means that we keep the permission to do a unit of work for up to B 
/ R seconds. When we reach a burst size and do not accumulate any more tokens, 
we can think about it as the oldest token expiring and the newly created token 
replacing the oldest one.
   
   Adapting this to SampledSet, we are accumulating tokens in chunks of Q units 
by default (sample length = 1 second) instead one token every 1/R second and 
expiring in chunks of Q units too (when we expire the oldest sample). The 
current implementation records the value into a sample based on a timestamp: a 
sample is considered complete (Sample.isComplete) if the current time is 
outside the sample. 
   
   The algorithm I was proposing was to not "complete" the sample and keep it 
current, even if it is already in the past as long as it is not purged from the 
set, until we fill that sample up to sample.value = Q (used all credits). This 
ensures that we use the oldest credits first that have not yet expired. In 
other words, if we purge a sample and its value < Quota, there should be no 
newer samples with recorded values. 
   
   If we have to record a value even if this would violate the quota, we need 
to generalize the condition to when the sample is considered complete and make 
sure that we don't give credits that we do not have (based on rate and burst). 
   
   Suppose, we filled in all 6 samples up to quota each: [5, 5, 5, 5, 5, 5], 
and we have to record 3 at time corresponding to the last sample. So, we record 
3 into the last sample: [5, 5, 5, 5, 5, 8]. Then, we record 3 at time that 
corresponds to a new sample (because we have no credits from the past), and 
expire the oldest one: 5 5 5 5 8 3. If the next value gets recorded at the time 
that corresponds to the next sample, we should not consider 3 < 5 (Q) as having 
credits. So, the condition for when a sample is considered complete:
   Current time advanced passed the sample AND sum of values in all samples in 
SampleSet >= (Q * MetricConfig.samples), which is basically when the sum of 
values reached the burst size (the maximum number of credits). 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to