----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33049/#review86418 -----------------------------------------------------------
I had collected my comments on this older revision. I have not looked at the latest, so can you incorporate relevant comments into the newest revision. MockTime: why was this moved out of the test package? We also had an offline discussion on perhaps adding a maxDelay option for quotas. i.e., this would perhaps be useful for consumers so that they can set their consumer timeout period appropriately. However, this is debatable since that is really a separate config - i.e., how long a consumer is willing to wait before giving up. If it is throttled more than that period then it should in fact stop. core/src/main/scala/kafka/server/ClientQuotaMetrics.scala <https://reviews.apache.org/r/33049/#comment138373> typo core/src/main/scala/kafka/server/ClientQuotaMetrics.scala <https://reviews.apache.org/r/33049/#comment138374> param is not a KafkaConfig core/src/main/scala/kafka/server/ClientQuotaMetrics.scala <https://reviews.apache.org/r/33049/#comment138377> We should probably expose a metric on the current size of the delay queue core/src/main/scala/kafka/server/ClientQuotaMetrics.scala <https://reviews.apache.org/r/33049/#comment138382> This is for the consumer as well right? core/src/main/scala/kafka/server/ClientQuotaMetrics.scala <https://reviews.apache.org/r/33049/#comment138381> recordAndMaybeThrottle ? core/src/main/scala/kafka/server/ClientQuotaMetrics.scala <https://reviews.apache.org/r/33049/#comment138383> We generally prefer not using tuples. In this case since it is private it is probably okay, but again, reconsider. http://kafka.apache.org/coding-guide.html If you absolutely must use tuples in a local context use something like: `val (quotaSensor, throttleTimeSensor) = (returnedValue._1, returnedValue._2)` core/src/main/scala/kafka/server/ClientQuotaMetrics.scala <https://reviews.apache.org/r/33049/#comment138378> Spacing. We generally do "a: b", not "a : b". I saw a few other occurrences. core/src/main/scala/kafka/server/ClientQuotaMetrics.scala <https://reviews.apache.org/r/33049/#comment138379> Spacing: a * b core/src/main/scala/kafka/server/ClientQuotaMetrics.scala <https://reviews.apache.org/r/33049/#comment138407> We should expose server-side metrics on per-client * API key throttle rate right? core/src/main/scala/kafka/server/ClientQuotaMetrics.scala <https://reviews.apache.org/r/33049/#comment138380> Should we set this to debug? and just rely on server metrics to know the rate at which requests are getting throttled? core/src/main/scala/kafka/server/ClientQuotaMetrics.scala <https://reviews.apache.org/r/33049/#comment138385> Use: `= overriddenQuota.getOrElse(clientId, defaultQuota)` core/src/main/scala/kafka/server/ClientQuotaMetrics.scala <https://reviews.apache.org/r/33049/#comment138375> This should ideally return an immutable map. core/src/main/scala/kafka/server/ClientQuotaMetrics.scala <https://reviews.apache.org/r/33049/#comment138376> You can use the collectio.map pattern to build the immutable map. It will also make this code more concise without losing clarity. core/src/main/scala/kafka/server/KafkaApis.scala <https://reviews.apache.org/r/33049/#comment138386> Yes this looks like an inadvertent ide-suggested import. core/src/main/scala/kafka/server/KafkaApis.scala <https://reviews.apache.org/r/33049/#comment138389> opening brace should be on same line core/src/main/scala/kafka/server/KafkaApis.scala <https://reviews.apache.org/r/33049/#comment138390> same core/src/main/scala/kafka/server/KafkaApis.scala <https://reviews.apache.org/r/33049/#comment138391> same core/src/main/scala/kafka/server/KafkaApis.scala <https://reviews.apache.org/r/33049/#comment138392> same core/src/main/scala/kafka/server/KafkaApis.scala <https://reviews.apache.org/r/33049/#comment138393> `.recordAndMaybeThrottle` would look better I think core/src/main/scala/kafka/server/KafkaApis.scala <https://reviews.apache.org/r/33049/#comment138394> Cannot throttle producer request... core/src/main/scala/kafka/server/KafkaApis.scala <https://reviews.apache.org/r/33049/#comment138395> `if (fetchRequest.isFromFollower)` core/src/main/scala/kafka/server/KafkaApis.scala <https://reviews.apache.org/r/33049/#comment138396> `cfg: KafkaConfig` core/src/main/scala/kafka/server/KafkaApis.scala <https://reviews.apache.org/r/33049/#comment138387> Can probably drop this comment. core/src/main/scala/kafka/server/KafkaConfig.scala <https://reviews.apache.org/r/33049/#comment138398> There is some inconsistency between configs here. (default vs bytes.per.second). How about BytesPerSecondOverrides and BytesPerSecondDefaults? core/src/main/scala/kafka/server/KafkaServer.scala <https://reviews.apache.org/r/33049/#comment138399> For simple declarations such as these, prefer dropping the type declarations. i.e., `val jmxPrefix = "kafka.server"` core/src/main/scala/kafka/server/KafkaServer.scala <https://reviews.apache.org/r/33049/#comment138400> same core/src/main/scala/kafka/server/KafkaServer.scala <https://reviews.apache.org/r/33049/#comment138401> Can we consider migrating the server to use the Time interface in clients and just use that uniformly throughout? core/src/main/scala/kafka/server/ThrottledRequest.scala <https://reviews.apache.org/r/33049/#comment138402> `override def` core/src/main/scala/kafka/server/ThrottledRequest.scala <https://reviews.apache.org/r/33049/#comment138403> `override def` core/src/main/scala/kafka/utils/ShutdownableThread.scala <https://reviews.apache.org/r/33049/#comment138404> typo or just: "is repeatedly invoked until..." core/src/test/scala/integration/kafka/api/QuotasTest.scala <https://reviews.apache.org/r/33049/#comment138405> Remove this line core/src/test/scala/integration/kafka/api/QuotasTest.scala <https://reviews.apache.org/r/33049/#comment138406> Consumer quotas don't seem to be tested in this? - Joel Koshy On June 3, 2015, 12:16 a.m., Aditya Auradkar wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/33049/ > ----------------------------------------------------------- > > (Updated June 3, 2015, 12:16 a.m.) > > > Review request for kafka, Joel Koshy and Jun Rao. > > > Bugs: KAFKA-2084 > https://issues.apache.org/jira/browse/KAFKA-2084 > > > Repository: kafka > > > Description > ------- > > Updated patch for quotas. This patch does the following: > 1. Add per-client metrics for both producer and consumers > 2. Add configuration for quotas > 3. Compute delay times in the metrics package and return the delay times in > QuotaViolationException > 4. Add a DelayQueue in KafkaApi's that can be used to throttle any type of > request. Implemented request throttling for produce and fetch requests. > 5. Added unit and integration test cases. > 6. This doesn't include a system test. There is a separate ticket for that > 7. Fixed KAFKA-2191 - (Included fix from : > https://reviews.apache.org/r/34418/ ) > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/common/metrics/Quota.java > d82bb0c055e631425bc1ebbc7d387baac76aeeaa > > clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java > a451e5385c9eca76b38b425e8ac856b2715fcffe > clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java > ca823fd4639523018311b814fde69b6177e73b97 > clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java > 98429da34418f7f1deba1b5e44e2e6025212edb3 > clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java > 544e120594de78c43581a980b1e4087b4fb98ccb > clients/src/test/java/org/apache/kafka/common/utils/MockTime.java > core/src/main/scala/kafka/server/ClientQuotaMetrics.scala PRE-CREATION > core/src/main/scala/kafka/server/KafkaApis.scala > 387e387998fc3a6c9cb585dab02b5f77b0381fbf > core/src/main/scala/kafka/server/KafkaConfig.scala > 9efa15ca5567b295ab412ee9eea7c03eb4cdc18b > core/src/main/scala/kafka/server/KafkaServer.scala > e66710d2368334ece66f70d55f57b3f888262620 > core/src/main/scala/kafka/server/ReplicaManager.scala > 59c9bc3ac3a8afc07a6f8c88c5871304db588d17 > core/src/main/scala/kafka/server/ThrottledRequest.scala PRE-CREATION > core/src/main/scala/kafka/utils/ShutdownableThread.scala > fc226c863095b7761290292cd8755cd7ad0f155c > core/src/test/scala/integration/kafka/api/QuotasTest.scala PRE-CREATION > core/src/test/scala/unit/kafka/server/ClientQuotaMetricsTest.scala > PRE-CREATION > core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala > 8014a5a6c362785539f24eb03d77278434614fe6 > core/src/test/scala/unit/kafka/server/ThrottledRequestExpirationTest.scala > PRE-CREATION > > Diff: https://reviews.apache.org/r/33049/diff/ > > > Testing > ------- > > > Thanks, > > Aditya Auradkar > >