Hey guys, I wanted to kick off a quick discussion of metrics with respect to the new producer and consumer (and potentially the server).
At a high level I think there are three approaches we could take: 1. Plain vanilla JMX 2. Use Coda Hale (AKA Yammer) Metrics 3. Do our own metrics (with JMX as one output) 1. Has the advantage that JMX is the most commonly used java thing and plugs in reasonably to most metrics systems. JMX is included in the JDK so it doesn't impose any additional dependencies on clients. It has the disadvantage that plain vanilla JMX is a pain to use. We would need a bunch of helper code for maintaining counters to make this reasonable. 2. Coda Hale metrics is pretty good and broadly used. It supports JMX output as well as direct output to many other types of systems. The primary downside we have had with Coda Hale has to do with the clients and library incompatibilities. We are currently on an older more popular version. The newer version is a rewrite of the APIs and is incompatible. Originally these were totally incompatible and people had to choose one or the other. I think that has been improved so now the new version is a totally different package. But even in this case you end up with both versions if you use Kafka and we are on a different version than you which is going to be pretty inconvenient. 3. Doing our own has the downside of potentially reinventing the wheel, and potentially needing to work out any bugs in our code. The upsides would depend on the how good the reinvention was. As it happens I did a quick (~900 loc) version of a metrics library that is under kafka.common.metrics. I think it has some advantages over the Yammer metrics package for our usage beyond just not causing incompatibilities. I will describe this code so we can discuss the pros and cons. Although I favor this approach I have no emotional attachment and wouldn't be too sad if I ended up deleting it. Here are javadocs for this code, though I haven't written much documentation yet since I might end up deleting it: Here is a quick overview of this library. There are three main public interfaces: Metrics - This is a repository of metrics being tracked. Metric - A single, named numerical value being measured (i.e. a counter). Sensor - This is a thing that records values and updates zero or more metrics So let's say we want to track three values about message sizes; specifically say we want to record the average, the maximum, the total rate of bytes being sent, and a count of messages. Then we would do something like this: // setup code Metrics metrics = new Metrics(); // this is a global "singleton" Sensor sensor = metrics.sensor("kafka.producer.message.sizes"); sensor.add("kafka.producer.message-size.avg", new Avg()); sensor.add("kafka.producer.message-size.max", new Max()); sensor.add("kafka.producer.bytes-sent-per-sec", new Rate()); sensor.add("kafka.producer.message-count", new Count()); // now when we get a message we do this sensor.record(messageSize); The above code creates the global metrics repository, creates a single Sensor, and defines 5 named metrics that are updated by that Sensor. Like Yammer Metrics (YM) I allow you to plug in "reporters", including a JMX reporter. Unlike the Coda Hale JMX reporter the reporter I have keys off the metric names not the Sensor names, which I think is an improvement--I just use the convention that the last portion of the name is the attribute name, the second to last is the mbean name, and the rest is the package. So in the above example there is a producer mbean that has a avg and max attribute and a producer mbean that has a bytes-sent-per-sec and message-count attribute. This is nice because you can logically group the values reported irrespective of where in the program they are computed--that is an mbean can logically group attributes computed off different sensors. This means you can report values by logical subsystem. I also allow the concept of hierarchical Sensors which I think is a good convenience. I have noticed a common pattern in systems where you need to roll up the same values along different dimensions. An simple example is metrics about qps, data rate, etc on the broker. These we want to capture in aggregate, but also broken down by topic-id. You can do this purely by defining the sensor hierarchy: Sensor allSizes = metrics.sensor("kafka.producer.sizes"); Sensor topicSizes = metrics.sensor("kafka.producer." + topic + ".sizes", allSizes); Now each actual update will go to the appropriate topicSizes sensor (based on the topic name), but allSizes metrics will get updated too. I also support multiple parents for each sensor as well as multiple layers of hiearchy, so you can define a more elaborate DAG of sensors. An example of how this would be useful is if you wanted to record your metrics broken down by topic AND client id as well as the global aggregate. Each metric can take a configurable Quota value which allows us to limit the maximum value of that sensor. This is intended for use on the server as part of our Quota implementation. The way this works is that you record metrics as usual: mySensor.record(42.0) However if this event occurance causes one of the metrics to exceed its maximum allowable value (the quota) this call will throw a QuotaViolationException. The cool thing about this is that it means we can define quotas on anything we capture metrics for, which I think is pretty cool. Another question is how to handle windowing of the values? Metrics want to record the "current" value, but the definition of current is inherently nebulous. A few of the obvious gotchas are that if you define "current" to be a number of events you can end up measuring an arbitrarily long window of time if the event rate is low (e.g. you think you are getting 50 messages/sec because that was the rate yesterday when all events topped). Here is how I approach this. All the metrics use the same windowing approach. We define a single window by a length of time or number of values (you can use either or both--if both the window ends when *either* the time bound or event bound is hit). The typical problem with hard window boundaries is that at the beginning of the window you have no data and the first few samples are too small to be a valid sample. (Consider if you were keeping an avg and the first value in the window happens to be very very high, if you check the avg at this exact time you will conclude the avg is very high but on a sample size of one). One simple fix would be to always report the last complete window, however this is not appropriate here because (1) we want to drive quotas off it so it needs to be current, and (2) since this is for monitoring you kind of care more about the current state. The ideal solution here would be to define a backwards looking sliding window from the present, but many statistics are actually very hard to compute in this model without retaining all the values which would be hopelessly inefficient. My solution to this is to keep a configurable number of windows (default is two) and combine them for the estimate. So in a two sample case depending on when you ask you have between one and two complete samples worth of data to base the answer off of. Provided the sample window is large enough to get a valid result this satisfies both of my criteria of incorporating the most recent data and having reasonable variance at all times. Another approach is to use an exponential weighting scheme to combine all history but emphasize the recent past. I have not done this as it has a lot of issues for practical operational metrics. I'd be happy to elaborate on this if anyone cares... The window size for metrics has a global default which can be overridden at either the sensor or individual metric level. In addition to these time series values the user can directly expose some method of their choosing JMX-style by implementing the Measurable interface and registering that value. E.g. metrics.addMetric("my.metric", new Measurable() { public double measure(MetricConfg config, long now) { return this.calculateValueToExpose(); } }); This is useful for exposing things like the accumulator free memory. The set of metrics is extensible, new metrics can be added by just implementing the appropriate interfaces and registering with a sensor. I implement the following metrics: total - the sum of all values from the given sensor count - a windowed count of values from the sensor avg - the sample average within the windows max - the max over the windows min - the min over the windows rate - the rate in the windows (e.g. the total or count divided by the ellapsed time) percentiles - a collection of percentiles computed over the window My approach to percentiles is a little different from the yammer metrics package. My complaint about the yammer metrics approach is that it uses rather expensive sampling and uses kind of a lot of memory to get a reasonable sample. This is problematic for per-topic measurements. Instead I use a fixed range for the histogram (e.g. 0.0 to 30000.0) which directly allows you to specify the desired memory use. Any value below the minimum is recorded as -Infinity and any value above the maximum as +Infinity. I think this is okay as all metrics have an expected range except for latency which can be arbitrarily large, but for very high latency there is no need to model it exactly (e.g. 30 seconds + really is effectively infinite). Within the range values are recorded in buckets which can be either fixed width or increasing width. The increasing width is analogous to the idea of significant figures, that is if your value is in the range 0-10 you might want to be accurate to within 1ms, but if it is 20000 there is no need to be so accurate. I implemented a linear bucket size where the Nth bucket has width proportional to N. An exponential bucket size would also be sensible and could likely be derived directly from the floating point representation of a the value. I'd like to get some feedback on this metrics code and make a decision on whether we want to use it before I actually go ahead and add all the instrumentation in the code (otherwise I'll have to redo it if we switch approaches). So the next topic of discussion will be which actual metrics to add. -Jay