Jay, Not a kafka dev (yet), but I really like having Coda metrics in Kafka since it simplifies the integration with dropwizard services, which I use a ton of. Using coda metrics in kafka means that they should automatically be surfaced on the dropwizard metrics REST endpoint as well as JMX and any other reporter I have configured.
A separate thread might be good to discuss the shortcomings of coda metrics - if that means a push to either upgrade Kafka to a newer release of metrics (pref aligned with either DW 6.2 or 7.0) or an enhancement to the metrics library itself. Thx, Clark On Thu, Feb 6, 2014 at 12:51 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > Hey guys, > > I wanted to kick off a quick discussion of metrics with respect to the new > producer and consumer (and potentially the server). > > At a high level I think there are three approaches we could take: > 1. Plain vanilla JMX > 2. Use Coda Hale (AKA Yammer) Metrics > 3. Do our own metrics (with JMX as one output) > > 1. Has the advantage that JMX is the most commonly used java thing and > plugs in reasonably to most metrics systems. JMX is included in the JDK so > it doesn't impose any additional dependencies on clients. It has the > disadvantage that plain vanilla JMX is a pain to use. We would need a bunch > of helper code for maintaining counters to make this reasonable. > > 2. Coda Hale metrics is pretty good and broadly used. It supports JMX > output as well as direct output to many other types of systems. The primary > downside we have had with Coda Hale has to do with the clients and library > incompatibilities. We are currently on an older more popular version. The > newer version is a rewrite of the APIs and is incompatible. Originally > these were totally incompatible and people had to choose one or the other. > I think that has been improved so now the new version is a totally > different package. But even in this case you end up with both versions if > you use Kafka and we are on a different version than you which is going to > be pretty inconvenient. > > 3. Doing our own has the downside of potentially reinventing the wheel, and > potentially needing to work out any bugs in our code. The upsides would > depend on the how good the reinvention was. As it happens I did a quick > (~900 loc) version of a metrics library that is under kafka.common.metrics. > I think it has some advantages over the Yammer metrics package for our > usage beyond just not causing incompatibilities. I will describe this code > so we can discuss the pros and cons. Although I favor this approach I have > no emotional attachment and wouldn't be too sad if I ended up deleting it. > Here are javadocs for this code, though I haven't written much > documentation yet since I might end up deleting it: > > Here is a quick overview of this library. > > There are three main public interfaces: > Metrics - This is a repository of metrics being tracked. > Metric - A single, named numerical value being measured (i.e. a counter). > Sensor - This is a thing that records values and updates zero or more > metrics > > So let's say we want to track three values about message sizes; > specifically say we want to record the average, the maximum, the total rate > of bytes being sent, and a count of messages. Then we would do something > like this: > > // setup code > Metrics metrics = new Metrics(); // this is a global "singleton" > Sensor sensor = metrics.sensor("kafka.producer.message.sizes"); > sensor.add("kafka.producer.message-size.avg", new Avg()); > sensor.add("kafka.producer.message-size.max", new Max()); > sensor.add("kafka.producer.bytes-sent-per-sec", new Rate()); > sensor.add("kafka.producer.message-count", new Count()); > > // now when we get a message we do this > sensor.record(messageSize); > > The above code creates the global metrics repository, creates a single > Sensor, and defines 5 named metrics that are updated by that Sensor. > > Like Yammer Metrics (YM) I allow you to plug in "reporters", including a > JMX reporter. Unlike the Coda Hale JMX reporter the reporter I have keys > off the metric names not the Sensor names, which I think is an > improvement--I just use the convention that the last portion of the name is > the attribute name, the second to last is the mbean name, and the rest is > the package. So in the above example there is a producer mbean that has a > avg and max attribute and a producer mbean that has a bytes-sent-per-sec > and message-count attribute. This is nice because you can logically group > the values reported irrespective of where in the program they are > computed--that is an mbean can logically group attributes computed off > different sensors. This means you can report values by logical subsystem. > > I also allow the concept of hierarchical Sensors which I think is a good > convenience. I have noticed a common pattern in systems where you need to > roll up the same values along different dimensions. An simple example is > metrics about qps, data rate, etc on the broker. These we want to capture > in aggregate, but also broken down by topic-id. You can do this purely by > defining the sensor hierarchy: > Sensor allSizes = metrics.sensor("kafka.producer.sizes"); > Sensor topicSizes = metrics.sensor("kafka.producer." + topic + ".sizes", > allSizes); > Now each actual update will go to the appropriate topicSizes sensor (based > on the topic name), but allSizes metrics will get updated too. I also > support multiple parents for each sensor as well as multiple layers of > hiearchy, so you can define a more elaborate DAG of sensors. An example of > how this would be useful is if you wanted to record your metrics broken > down by topic AND client id as well as the global aggregate. > > Each metric can take a configurable Quota value which allows us to limit > the maximum value of that sensor. This is intended for use on the server as > part of our Quota implementation. The way this works is that you record > metrics as usual: > mySensor.record(42.0) > However if this event occurance causes one of the metrics to exceed its > maximum allowable value (the quota) this call will throw a > QuotaViolationException. The cool thing about this is that it means we can > define quotas on anything we capture metrics for, which I think is pretty > cool. > > Another question is how to handle windowing of the values? Metrics want to > record the "current" value, but the definition of current is inherently > nebulous. A few of the obvious gotchas are that if you define "current" to > be a number of events you can end up measuring an arbitrarily long window > of time if the event rate is low (e.g. you think you are getting 50 > messages/sec because that was the rate yesterday when all events topped). > > Here is how I approach this. All the metrics use the same windowing > approach. We define a single window by a length of time or number of values > (you can use either or both--if both the window ends when *either* the time > bound or event bound is hit). The typical problem with hard window > boundaries is that at the beginning of the window you have no data and the > first few samples are too small to be a valid sample. (Consider if you were > keeping an avg and the first value in the window happens to be very very > high, if you check the avg at this exact time you will conclude the avg is > very high but on a sample size of one). One simple fix would be to always > report the last complete window, however this is not appropriate here > because (1) we want to drive quotas off it so it needs to be current, and > (2) since this is for monitoring you kind of care more about the current > state. The ideal solution here would be to define a backwards looking > sliding window from the present, but many statistics are actually very hard > to compute in this model without retaining all the values which would be > hopelessly inefficient. My solution to this is to keep a configurable > number of windows (default is two) and combine them for the estimate. So in > a two sample case depending on when you ask you have between one and two > complete samples worth of data to base the answer off of. Provided the > sample window is large enough to get a valid result this satisfies both of > my criteria of incorporating the most recent data and having reasonable > variance at all times. > > Another approach is to use an exponential weighting scheme to combine all > history but emphasize the recent past. I have not done this as it has a lot > of issues for practical operational metrics. I'd be happy to elaborate on > this if anyone cares... > > The window size for metrics has a global default which can be overridden at > either the sensor or individual metric level. > > In addition to these time series values the user can directly expose some > method of their choosing JMX-style by implementing the Measurable interface > and registering that value. E.g. > metrics.addMetric("my.metric", new Measurable() { > public double measure(MetricConfg config, long now) { > return this.calculateValueToExpose(); > } > }); > This is useful for exposing things like the accumulator free memory. > > The set of metrics is extensible, new metrics can be added by just > implementing the appropriate interfaces and registering with a sensor. I > implement the following metrics: > total - the sum of all values from the given sensor > count - a windowed count of values from the sensor > avg - the sample average within the windows > max - the max over the windows > min - the min over the windows > rate - the rate in the windows (e.g. the total or count divided by the > ellapsed time) > percentiles - a collection of percentiles computed over the window > > My approach to percentiles is a little different from the yammer metrics > package. My complaint about the yammer metrics approach is that it uses > rather expensive sampling and uses kind of a lot of memory to get a > reasonable sample. This is problematic for per-topic measurements. > > Instead I use a fixed range for the histogram (e.g. 0.0 to 30000.0) which > directly allows you to specify the desired memory use. Any value below the > minimum is recorded as -Infinity and any value above the maximum as > +Infinity. I think this is okay as all metrics have an expected range > except for latency which can be arbitrarily large, but for very high > latency there is no need to model it exactly (e.g. 30 seconds + really is > effectively infinite). Within the range values are recorded in buckets > which can be either fixed width or increasing width. The increasing width > is analogous to the idea of significant figures, that is if your value is > in the range 0-10 you might want to be accurate to within 1ms, but if it is > 20000 there is no need to be so accurate. I implemented a linear bucket > size where the Nth bucket has width proportional to N. An exponential > bucket size would also be sensible and could likely be derived directly > from the floating point representation of a the value. > > I'd like to get some feedback on this metrics code and make a decision on > whether we want to use it before I actually go ahead and add all the > instrumentation in the code (otherwise I'll have to redo it if we switch > approaches). So the next topic of discussion will be which actual metrics > to add. > > -Jay >