I uploaded an updated version of the new consumer client (
https://issues.apache.org/jira/browse/KAFKA-1760). This is now almost
feature complete, and has pretty reasonable testing and metrics. I think it
is ready for review and could be checked in once 0.8.2 is out.

For those who haven't been following this is meant to be a new consumer
client, like the new producer is 0.8.2, and intended to replace the
existing "high level" and "simple" scala consumers.

This still needs the server-side implementation of the partition assignment
and group management to be fully functional. I have just stubbed this out
in the server to allow the implementation and testing of the server but
actual usage will require it. However the client that exists now is
actually a fully functional replacement for the "simple consumer" that is
vastly easier to use correctly as it internally does all the discovery and
failover.

It would be great if people could take a look at this code, and
particularly at the public apis which have several small changes from the
original proposal.

Summary

What's there:
1. Simple consumer functionality
2. Offset commit and fetch
3. Ability to change position with seek
4. Ability to commit all or just some offsets
5. Controller discovery, failure detection, heartbeat, and fail-over
6. Controller partition assignment
7. Logging
8. Metrics
9. Integration tests including tests that simulate random broker failures
10. Integration into the consumer performance test

Limitations:
1. There could be some lingering bugs in the group management support, it
is hard to fully test fully with just the stub support on the server, so
we'll need to get the server working to do better I think.
2. I haven't implemented wild-card subscriptions yet.
3. No integration with console consumer yet

Performance

I did some performance comparison with the old consumer over localhost on
my laptop. Usually localhost isn't good for testing but in this case it is
good because it has near infinite bandwidth so it does a good job at
catching inefficiencies that would be hidden with a slower network. These
numbers probably aren't representative of what you would get over a real
network, but help bring out the relative efficiencies.
Here are the results:
- Old high-level consumer: 213 MB/sec
- New consumer: 225 MB/sec
- Old simple consumer: 242 Mb/sec

It may be hard to get this client up to the same point as the simple
consumer as it is doing very little beyond allocating and wrapping byte
buffers that it reads off the network.

The big thing that shows up in profiling is the buffer allocation for
reading data. So one speed-up would be to pool these.

Some things to discuss

1. What should the behavior of consumer.position() and consumer.committed()
be immediately after initialization (prior to calling poll). Currently
these methods just fetch the current value from memory, but if the position
isn't in memory it will try to fetch it from the server, if no position is
found it will use the auto-offset reset policy to pick on. I think this is
the right thing to do because you can't guarantee how many calls to poll()
will be required before full initialization would be complete otherwise.
But it is kind of weird.
2. Overall code structure improvement. These NIO network clients tend to be
very imperative in nature. I'm not sure this is bad, but if anyone has any
idea on improving the code I'd love to hear it.

-Jay

Reply via email to