I uploaded an updated version of the new consumer client ( https://issues.apache.org/jira/browse/KAFKA-1760). This is now almost feature complete, and has pretty reasonable testing and metrics. I think it is ready for review and could be checked in once 0.8.2 is out.
For those who haven't been following this is meant to be a new consumer client, like the new producer is 0.8.2, and intended to replace the existing "high level" and "simple" scala consumers. This still needs the server-side implementation of the partition assignment and group management to be fully functional. I have just stubbed this out in the server to allow the implementation and testing of the server but actual usage will require it. However the client that exists now is actually a fully functional replacement for the "simple consumer" that is vastly easier to use correctly as it internally does all the discovery and failover. It would be great if people could take a look at this code, and particularly at the public apis which have several small changes from the original proposal. Summary What's there: 1. Simple consumer functionality 2. Offset commit and fetch 3. Ability to change position with seek 4. Ability to commit all or just some offsets 5. Controller discovery, failure detection, heartbeat, and fail-over 6. Controller partition assignment 7. Logging 8. Metrics 9. Integration tests including tests that simulate random broker failures 10. Integration into the consumer performance test Limitations: 1. There could be some lingering bugs in the group management support, it is hard to fully test fully with just the stub support on the server, so we'll need to get the server working to do better I think. 2. I haven't implemented wild-card subscriptions yet. 3. No integration with console consumer yet Performance I did some performance comparison with the old consumer over localhost on my laptop. Usually localhost isn't good for testing but in this case it is good because it has near infinite bandwidth so it does a good job at catching inefficiencies that would be hidden with a slower network. These numbers probably aren't representative of what you would get over a real network, but help bring out the relative efficiencies. Here are the results: - Old high-level consumer: 213 MB/sec - New consumer: 225 MB/sec - Old simple consumer: 242 Mb/sec It may be hard to get this client up to the same point as the simple consumer as it is doing very little beyond allocating and wrapping byte buffers that it reads off the network. The big thing that shows up in profiling is the buffer allocation for reading data. So one speed-up would be to pool these. Some things to discuss 1. What should the behavior of consumer.position() and consumer.committed() be immediately after initialization (prior to calling poll). Currently these methods just fetch the current value from memory, but if the position isn't in memory it will try to fetch it from the server, if no position is found it will use the auto-offset reset policy to pick on. I think this is the right thing to do because you can't guarantee how many calls to poll() will be required before full initialization would be complete otherwise. But it is kind of weird. 2. Overall code structure improvement. These NIO network clients tend to be very imperative in nature. I'm not sure this is bad, but if anyone has any idea on improving the code I'd love to hear it. -Jay