Hey guys,

I wanted to kick off a quick discussion of metrics with respect to the new
producer and consumer (and potentially the server).

At a high level I think there are three approaches we could take:
1. Plain vanilla JMX
2. Use Coda Hale (AKA Yammer) Metrics
3. Do our own metrics (with JMX as one output)

1. Has the advantage that JMX is the most commonly used java thing and
plugs in reasonably to most metrics systems. JMX is included in the JDK so
it doesn't impose any additional dependencies on clients. It has the
disadvantage that plain vanilla JMX is a pain to use. We would need a bunch
of helper code for maintaining counters to make this reasonable.

2. Coda Hale metrics is pretty good and broadly used. It supports JMX
output as well as direct output to many other types of systems. The primary
downside we have had with Coda Hale has to do with the clients and library
incompatibilities. We are currently on an older more popular version. The
newer version is a rewrite of the APIs and is incompatible. Originally
these were totally incompatible and people had to choose one or the other.
I think that has been improved so now the new version is a totally
different package. But even in this case you end up with both versions if
you use Kafka and we are on a different version than you which is going to
be pretty inconvenient.

3. Doing our own has the downside of potentially reinventing the wheel, and
potentially needing to work out any bugs in our code. The upsides would
depend on the how good the reinvention was. As it happens I did a quick
(~900 loc) version of a metrics library that is under kafka.common.metrics.
I think it has some advantages over the Yammer metrics package for our
usage beyond just not causing incompatibilities. I will describe this code
so we can discuss the pros and cons. Although I favor this approach I have
no emotional attachment and wouldn't be too sad if I ended up deleting it.
Here are javadocs for this code, though I haven't written much
documentation yet since I might end up deleting it:

Here is a quick overview of this library.

There are three main public interfaces:
  Metrics - This is a repository of metrics being tracked.
  Metric - A single, named numerical value being measured (i.e. a counter).
  Sensor - This is a thing that records values and updates zero or more
metrics

So let's say we want to track three values about message sizes;
specifically say we want to record the average, the maximum, the total rate
of bytes being sent, and a count of messages. Then we would do something
like this:

   // setup code
   Metrics metrics = new Metrics(); // this is a global "singleton"
   Sensor sensor = metrics.sensor("kafka.producer.message.sizes");
   sensor.add("kafka.producer.message-size.avg", new Avg());
   sensor.add("kafka.producer.message-size.max", new Max());
   sensor.add("kafka.producer.bytes-sent-per-sec", new Rate());
   sensor.add("kafka.producer.message-count", new Count());

   // now when we get a message we do this
   sensor.record(messageSize);

The above code creates the global metrics repository, creates a single
Sensor, and defines 5 named metrics that are updated by that Sensor.

Like Yammer Metrics (YM) I allow you to plug in "reporters", including a
JMX reporter. Unlike the Coda Hale JMX reporter the reporter I have keys
off the metric names not the Sensor names, which I think is an
improvement--I just use the convention that the last portion of the name is
the attribute name, the second to last is the mbean name, and the rest is
the package. So in the above example there is a producer mbean that has a
avg and max attribute and a producer mbean that has a bytes-sent-per-sec
and message-count attribute. This is nice because you can logically group
the values reported irrespective of where in the program they are
computed--that is an mbean can logically group attributes computed off
different sensors. This means you can report values by logical subsystem.

I also allow the concept of hierarchical Sensors which I think is a good
convenience. I have noticed a common pattern in systems where you need to
roll up the same values along different dimensions. An simple example is
metrics about qps, data rate, etc on the broker. These we want to capture
in aggregate, but also broken down by topic-id. You can do this purely by
defining the sensor hierarchy:
Sensor allSizes = metrics.sensor("kafka.producer.sizes");
Sensor topicSizes = metrics.sensor("kafka.producer." + topic  + ".sizes",
allSizes);
Now each actual update will go to the appropriate topicSizes sensor (based
on the topic name), but allSizes metrics will get updated too. I also
support multiple parents for each sensor as well as multiple layers of
hiearchy, so you can define a more elaborate DAG of sensors. An example of
how this would be useful is if you wanted to record your metrics broken
down by topic AND client id as well as the global aggregate.

Each metric can take a configurable Quota value which allows us to limit
the maximum value of that sensor. This is intended for use on the server as
part of our Quota implementation. The way this works is that you record
metrics as usual:
   mySensor.record(42.0)
However if this event occurance causes one of the metrics to exceed its
maximum allowable value (the quota) this call will throw a
QuotaViolationException. The cool thing about this is that it means we can
define quotas on anything we capture metrics for, which I think is pretty
cool.

Another question is how to handle windowing of the values? Metrics want to
record the "current" value, but the definition of current is inherently
nebulous. A few of the obvious gotchas are that if you define "current" to
be a number of events you can end up measuring an arbitrarily long window
of time if the event rate is low (e.g. you think you are getting 50
messages/sec because that was the rate yesterday when all events topped).

Here is how I approach this. All the metrics use the same windowing
approach. We define a single window by a length of time or number of values
(you can use either or both--if both the window ends when *either* the time
bound or event bound is hit). The typical problem with hard window
boundaries is that at the beginning of the window you have no data and the
first few samples are too small to be a valid sample. (Consider if you were
keeping an avg and the first value in the window happens to be very very
high, if you check the avg at this exact time you will conclude the avg is
very high but on a sample size of one). One simple fix would be to always
report the last complete window, however this is not appropriate here
because (1) we want to drive quotas off it so it needs to be current, and
(2) since this is for monitoring you kind of care more about the current
state. The ideal solution here would be to define a backwards looking
sliding window from the present, but many statistics are actually very hard
to compute in this model without retaining all the values which would be
hopelessly inefficient. My solution to this is to keep a configurable
number of windows (default is two) and combine them for the estimate. So in
a two sample case depending on when you ask you have between one and two
complete samples worth of data to base the answer off of. Provided the
sample window is large enough to get a valid result this satisfies both of
my criteria of incorporating the most recent data and having reasonable
variance at all times.

Another approach is to use an exponential weighting scheme to combine all
history but emphasize the recent past. I have not done this as it has a lot
of issues for practical operational metrics. I'd be happy to elaborate on
this if anyone cares...

The window size for metrics has a global default which can be overridden at
either the sensor or individual metric level.

In addition to these time series values the user can directly expose some
method of their choosing JMX-style by implementing the Measurable interface
and registering that value. E.g.
  metrics.addMetric("my.metric", new Measurable() {
    public double measure(MetricConfg config, long now) {
       return this.calculateValueToExpose();
    }
  });
This is useful for exposing things like the accumulator free memory.

The set of metrics is extensible, new metrics can be added by just
implementing the appropriate interfaces and registering with a sensor. I
implement the following metrics:
  total - the sum of all values from the given sensor
  count - a windowed count of values from the sensor
  avg - the sample average within the windows
  max - the max over the windows
  min - the min over the windows
  rate - the rate in the windows (e.g. the total or count divided by the
ellapsed time)
  percentiles - a collection of percentiles computed over the window

My approach to percentiles is a little different from the yammer metrics
package. My complaint about the yammer metrics approach is that it uses
rather expensive sampling and uses kind of a lot of memory to get a
reasonable sample. This is problematic for per-topic measurements.

Instead I use a fixed range for the histogram (e.g. 0.0 to 30000.0) which
directly allows you to specify the desired memory use. Any value below the
minimum is recorded as -Infinity and any value above the maximum as
+Infinity. I think this is okay as all metrics have an expected range
except for latency which can be arbitrarily large, but for very high
latency there is no need to model it exactly (e.g. 30 seconds + really is
effectively infinite). Within the range values are recorded in buckets
which can be either fixed width or increasing width. The increasing width
is analogous to the idea of significant figures, that is if your value is
in the range 0-10 you might want to be accurate to within 1ms, but if it is
20000 there is no need to be so accurate. I implemented a linear bucket
size where the Nth bucket has width proportional to N. An exponential
bucket size would also be sensible and could likely be derived directly
from the floating point representation of a the value.

I'd like to get some feedback on this metrics code and make a decision on
whether we want to use it before I actually go ahead and add all the
instrumentation in the code (otherwise I'll have to redo it if we switch
approaches). So the next topic of discussion will be which actual metrics
to add.

-Jay

Reply via email to