> On Aug. 14, 2015, 1:57 a.m., Jun Rao wrote: > > clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java, > > lines 69-77 > > <https://reviews.apache.org/r/33049/diff/25/?file=1039169#file1039169line69> > > > > This is probably not the right place to throw QuotaViolationException. > > Rate.measure() can be called when the JMX value is polled and we don't want > > to throw an exception there. > > > > I was thinking that another way to do this is to perhaps add the logic > > in Quota.acceptable(). We can probably change it to be > > > > // if the check passes, return -1; otherwise return a delay to make the > > check pass. > > int Quota.check(KafkaMetrics, long now). > > > > In the implementation, we can then check if the KafkaMetrics passed in > > has a measurable that's an instance of Rate. If so, we will compute the > > delay for rate (the window should be elapsedCurrent and elapsedPrior as > > computed in Rate.measure()). > > Aditya Auradkar wrote: > Rate/Measurable stat does not expose the windows currently. So we cannot > access elapsedCurrent and elapsedPrior from the KafkaMetric object in Sensor. > Couple of options: > 1. We can simply record it in Rate.record() which is called from > Sensor.record > 2. We can check quotas in Sensor.java but not track the precise window > boundaries.
Discussed offline with Aditya: I actually think the earlier approach with a slightly pessimistic delay in `Sensor` would be sufficient. Option 1 also sounds good but is slightly odd that we will then throw the exception from `Rate` directly and `Sensor.checkQuotas` as well (for other types). After thinking through this I think it is worthwhile to consider a small change to the implementation (https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas#KIP-13-Quotas-DelaytimeinQuotaViolationException) to _not_ include the delay time in `QuotaViolationException`. i.e., the delay really (currently) applies only to `Rate` and does not fit very well with some other types (e.g., `Max`). Instead we we could compute the delay directly in `ClientQuotaManager` and leave `Sensor.checkQuotas` as is. - Joel ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33049/#review95369 ----------------------------------------------------------- On Aug. 14, 2015, 2:20 a.m., Aditya Auradkar wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/33049/ > ----------------------------------------------------------- > > (Updated Aug. 14, 2015, 2:20 a.m.) > > > Review request for kafka, Joel Koshy and Jun Rao. > > > Bugs: KAFKA-2084 > https://issues.apache.org/jira/browse/KAFKA-2084 > > > Repository: kafka > > > Description > ------- > > Updated patch for quotas. This patch does the following: > 1. Add per-client metrics for both producer and consumers > 2. Add configuration for quotas > 3. Compute delay times in the metrics package and return the delay times in > QuotaViolationException > 4. Add a DelayQueue in KafkaApi's that can be used to throttle any type of > request. Implemented request throttling for produce and fetch requests. > 5. Added unit and integration test cases for both producer and consumer > 6. This doesn't include a system test. There is a separate ticket for that > 7. Fixed KAFKA-2191 - (Included fix from : > https://reviews.apache.org/r/34418/ ) > > Addressed comments from Joel and Jun > > > Diffs > ----- > > build.gradle 1b67e628c2fca897177c12b6afad9a8700fffd1f > clients/src/main/java/org/apache/kafka/common/metrics/Quota.java > d82bb0c055e631425bc1ebbc7d387baac76aeeaa > > clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java > a451e5385c9eca76b38b425e8ac856b2715fcffe > clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java > ca823fd4639523018311b814fde69b6177e73b97 > clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java > 98429da34418f7f1deba1b5e44e2e6025212edb3 > clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java > 544e120594de78c43581a980b1e4087b4fb98ccb > core/src/main/scala/kafka/server/ClientQuotaManager.scala PRE-CREATION > core/src/main/scala/kafka/server/KafkaApis.scala > 7ea509c2c41acc00430c74e025e069a833aac4e7 > core/src/main/scala/kafka/server/KafkaConfig.scala > a06f0bd40e2f90972b44b80a106f98f3d50e5e2b > core/src/main/scala/kafka/server/KafkaServer.scala > 84d4730ac634f9a5bf12a656e422fea03ad72da8 > core/src/main/scala/kafka/server/ReplicaManager.scala > 2e0bbcd6e4f0e38997ea18202b249ee3553640ec > core/src/main/scala/kafka/server/ThrottledResponse.scala PRE-CREATION > core/src/main/scala/kafka/utils/ShutdownableThread.scala > fc226c863095b7761290292cd8755cd7ad0f155c > core/src/test/scala/integration/kafka/api/QuotasTest.scala PRE-CREATION > core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala > PRE-CREATION > core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala > e26a7306a6ea3104b3fa3df60006c0a473bfb2cc > core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala > PRE-CREATION > > Diff: https://reviews.apache.org/r/33049/diff/ > > > Testing > ------- > > > Thanks, > > Aditya Auradkar > >