I took some time to write some example code using the new consumer APIs to cover a range of use cases. This exercise was very useful (thanks for the suggestion, Jay!) since I found several improvements to the APIs to make them more usable. Here are some of the changes<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/>I made -
1. Added usage examples to the KafkaConsumer javadoc<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html>. I find it useful for the examples to be in the javadoc vs some wiki. Please go through these examples and suggest improvements. The goal would be to document a limited set of examples that cover every major use case. 2. All APIs that either accept or return offsets are changed to Map<TopicPartition,Long> instead of TopicPartitionOffset... In all the examples that I wrote, it was much easier to deal with offsets and pass them around in the consumer APIs if they were maps instead of lists 3. Due to the above change, I had to introduce commit()<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html#commit%28%29>and commitAsync() APIs explicitly, in addition to commit(Map<TopicPartition,Long> offsets) and commitAsync(Map<TopicPartition,Long> offsets), since the no-argument case would not be covered automatically with Map as the input parameter to the commit APIs 4. Offset rewind logic is funky with group management. I took a stab and it and wrote examples to cover the various offset rewind uses cases I could think of. I'm not so sure I like it, so I encourage people to take a look at the examples and provide feedback. This feedback is very critical in finalizing the consumer APIs as we might have to add/change APIs to make offset rewind intuitive and easy to use. (Please see the 3rd and 4th examples here<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html> ) Once I have feedback on the above, I will go ahead and submit a review board for the new APIs and javadoc. Thanks Neha On Mon, Mar 24, 2014 at 5:29 PM, Neha Narkhede <neha.narkh...@gmail.com>wrote: > Hey Chris, > > Really sorry for the late reply, wonder how this fell through the cracks. > Anyhow, thanks for the great feedback! Here are my comments - > > > 1. Why is the config String->Object instead of String->String? > > This is probably more of a feedback about the new config management that > we adopted in the new clients. I think it is more convenient to write > configs.put("a", 42); > instead of > configs.put("a", Integer.toString(42)); > > 2. Are these Java docs correct? > > KafkaConsumer(java.util.Map< > java.lang.String,java.lang.Object> configs) > A consumer is instantiated by providing a set of key-value pairs as > configuration and a ConsumerRebalanceCallback implementation > > There is no ConsumerRebalanceCallback parameter. > > Fixed. > > > 3. Would like to have a method: > > poll(long timeout, java.util.concurrent.TimeUnit timeUnit, > TopicPartition... topicAndPartitionsToPoll) > > I see I can effectively do this by just fiddling with subscribe and > unsubscribe before each poll. Is this a low-overhead operation? Can I just > unsubscribe from everything after each poll, then re-subscribe to a topic > the next iteration. I would probably be doing this in a fairly tight loop. > > The subscribe and unsubscribe will be very lightweight in-memory > operations, > so it shouldn't be a problem to just use those APIs directly. > Let me know if you think otherwise. > > 4. The behavior of AUTO_OFFSET_RESET_CONFIG is overloaded. I think there > are use cases for decoupling "what to do when no offset exists" from "what > to do when I'm out of range". I might want to start from smallest the > first time I run, but fail if I ever get offset out of range. > > How about adding a third option "disable" to "auto.offset.reset"? > What this says is that never automatically reset the offset, either if one > is not found or if the offset > falls out of range. Presumably, you would want to turn this off when you > want to control the offsets > yourself and use custom rewind/replay logic to reset the consumer's > offset. In this case, you would > want to turn this feature off so Kafka does not accidentally reset the > offset to something else. > > I'm not so sure when you would want to make the distinction regarding > startup and offset falling out > of range. Presumably, if you don't trust Kafka to reset the offset, then > you can always turn this off > and use commit/commitAsync and seek() to set the consumer to the right > offset on startup and every > time your consumer falls out of range. > > Does that make sense? > > 5. ENABLE_JMX could use Java docs, even though it's fairly > self-explanatory. > > Fixed. > > 6. Clarity about whether FETCH_BUFFER_CONFIG is per-topic/partition, or > across all topic/partitions is useful. I believe it's per-topic/partition, > right? That is, setting to 2 megs with two TopicAndPartitions would result > in 4 megs worth of data coming in per fetch, right? > > Good point, clarified that. Take a look again to see if it makes sense now. > > 7. What does the consumer do if METADATA_FETCH_TIMEOUT_CONFIG times out? > Retry, or throw exception? > > Throw a TimeoutException. Clarified that in the > docs<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html> > . > > > 8. Does RECONNECT_BACKOFF_MS_CONFIG apply to both metadata requests and > fetch requests? > > Applies to all requests. Clarified that in the docs. > > 9. What does SESSION_TIMEOUT_MS default to? > > Defaults are largely TODO, but session.timeout.ms currently defaults to > 1000. > > 10. Is this consumer thread-safe? > > It should be. Updated the > docs<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html>to > clarify that. > > 11. How do you use a different offset management strategy? Your email > implies that it's pluggable, but I don't see how. "The offset management > strategy defaults to Kafka based offset management and the API provides a > way for the user to use a customized offset store to manage the consumer's > offsets." > > 12. If I wish to decouple the consumer from the offset checkpointing, is > it OK to use Joel's offset management stuff directly, rather than through > the consumer's commit API? > > For #11 and #12, I updated the > docs<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html>to > include actual usage examples. > Could you take a look and see if answers your questions? > > Thanks, > Neha > > > > On Mon, Mar 3, 2014 at 10:28 AM, Chris Riccomini > <criccom...@linkedin.com>wrote: > >> Hey Guys, >> >> Also, for reference, we'll be looking to implement new Samza consumers >> which have these APIs: >> >> >> http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/or >> g/apache/samza/system/SystemConsumer.html<http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/system/SystemConsumer.html> >> >> >> http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/or >> g/apache/samza/checkpoint/CheckpointManager.html<http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/checkpoint/CheckpointManager.html> >> >> >> Question (3) below is a result of having Samza's SystemConsumers poll >> allow specific topic/partitions to be specified. >> >> The split between consumer and checkpoint manager is the reason for >> question (12) below. >> >> Cheers, >> Chris >> >> On 3/3/14 10:19 AM, "Chris Riccomini" <criccom...@linkedin.com> wrote: >> >> >Hey Guys, >> > >> >Sorry for the late follow up. Here are my questions/thoughts on the API: >> > >> >1. Why is the config String->Object instead of String->String? >> > >> >2. Are these Java docs correct? >> > >> > KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs) >> > A consumer is instantiated by providing a set of key-value pairs as >> >configuration and a ConsumerRebalanceCallback implementation >> > >> >There is no ConsumerRebalanceCallback parameter. >> > >> >3. Would like to have a method: >> > >> > poll(long timeout, java.util.concurrent.TimeUnit timeUnit, >> >TopicPartition... topicAndPartitionsToPoll) >> > >> >I see I can effectively do this by just fiddling with subscribe and >> >unsubscribe before each poll. Is this a low-overhead operation? Can I >> just >> >unsubscribe from everything after each poll, then re-subscribe to a topic >> >the next iteration. I would probably be doing this in a fairly tight >> loop. >> > >> >4. The behavior of AUTO_OFFSET_RESET_CONFIG is overloaded. I think there >> >are use cases for decoupling "what to do when no offset exists" from >> "what >> >to do when I'm out of range". I might want to start from smallest the >> >first time I run, but fail if I ever get offset out of range. >> > >> >5. ENABLE_JMX could use Java docs, even though it's fairly >> >self-explanatory. >> > >> >6. Clarity about whether FETCH_BUFFER_CONFIG is per-topic/partition, or >> >across all topic/partitions is useful. I believe it's >> per-topic/partition, >> >right? That is, setting to 2 megs with two TopicAndPartitions would >> result >> >in 4 megs worth of data coming in per fetch, right? >> > >> >7. What does the consumer do if METADATA_FETCH_TIMEOUT_CONFIG times out? >> >Retry, or throw exception? >> > >> >8. Does RECONNECT_BACKOFF_MS_CONFIG apply to both metadata requests and >> >fetch requests? >> > >> >9. What does SESSION_TIMEOUT_MS default to? >> > >> >10. Is this consumer thread-safe? >> > >> >11. How do you use a different offset management strategy? Your email >> >implies that it's pluggable, but I don't see how. "The offset management >> >strategy defaults to Kafka based offset management and the API provides a >> >way for the user to use a customized offset store to manage the >> consumer's >> >offsets." >> > >> >12. If I wish to decouple the consumer from the offset checkpointing, is >> >it OK to use Joel's offset management stuff directly, rather than through >> >the consumer's commit API? >> > >> > >> >Cheers, >> >Chris >> > >> >On 2/10/14 10:54 AM, "Neha Narkhede" <neha.narkh...@gmail.com> wrote: >> > >> >>As mentioned in previous emails, we are also working on a >> >>re-implementation >> >>of the consumer. I would like to use this email thread to discuss the >> >>details of the public API. I would also like us to be picky about this >> >>public api now so it is as good as possible and we don't need to break >> it >> >>in the future. >> >> >> >>The best way to get a feel for the API is actually to take a look at the >> >>javadoc< >> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc >> >>/ >> >>doc/kafka/clients/consumer/KafkaConsumer.html>, >> >>the hope is to get the api docs good enough so that it is >> >>self-explanatory. >> >>You can also take a look at the configs >> >>here< >> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/do >> >>c >> >>/kafka/clients/consumer/ConsumerConfig.html> >> >> >> >>Some background info on implementation: >> >> >> >>At a high level the primary difference in this consumer is that it >> >>removes >> >>the distinction between the "high-level" and "low-level" consumer. The >> >>new >> >>consumer API is non blocking and instead of returning a blocking >> >>iterator, >> >>the consumer provides a poll() API that returns a list of records. We >> >>think >> >>this is better compared to the blocking iterators since it effectively >> >>decouples the threading strategy used for processing messages from the >> >>consumer. It is worth noting that the consumer is entirely single >> >>threaded >> >>and runs in the user thread. The advantage is that it can be easily >> >>rewritten in less multi-threading-friendly languages. The consumer >> >>batches >> >>data and multiplexes I/O over TCP connections to each of the brokers it >> >>communicates with, for high throughput. The consumer also allows long >> >>poll >> >>to reduce the end-to-end message latency for low throughput data. >> >> >> >>The consumer provides a group management facility that supports the >> >>concept >> >>of a group with multiple consumer instances (just like the current >> >>consumer). This is done through a custom heartbeat and group management >> >>protocol transparent to the user. At the same time, it allows users the >> >>option to subscribe to a fixed set of partitions and not use group >> >>management at all. The offset management strategy defaults to Kafka >> based >> >>offset management and the API provides a way for the user to use a >> >>customized offset store to manage the consumer's offsets. >> >> >> >>A key difference in this consumer also is the fact that it does not >> >>depend >> >>on zookeeper at all. >> >> >> >>More details about the new consumer design are >> >>here< >> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer >> >>+ >> >>Rewrite+Design> >> >> >> >>Please take a look at the new >> >>API< >> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc >> >>/ >> >>kafka/clients/consumer/KafkaConsumer.html>and >> >>give us any thoughts you may have. >> >> >> >>Thanks, >> >>Neha >> > >> >> >