If people don't have any more thoughts on this, I will go ahead and submit a reviewboard to https://issues.apache.org/jira/browse/KAFKA-1328.
Thanks, Neha On Mon, Mar 24, 2014 at 5:39 PM, Neha Narkhede <neha.narkh...@gmail.com>wrote: > I took some time to write some example code using the new consumer APIs to > cover a range of use cases. This exercise was very useful (thanks for the > suggestion, Jay!) since I found several improvements to the APIs to make > them more usable. Here are some of the > changes<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/>I > made - > > 1. Added usage examples to the KafkaConsumer > javadoc<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html>. > I find it useful for the examples to be in the javadoc vs some wiki. Please > go through these examples and suggest improvements. The goal would be to > document a limited set of examples that cover every major use case. > 2. All APIs that either accept or return offsets are changed to > Map<TopicPartition,Long> instead of TopicPartitionOffset... In all the > examples that I wrote, it was much easier to deal with offsets and pass > them around in the consumer APIs if they were maps instead of lists > 3. Due to the above change, I had to introduce > commit()<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html#commit%28%29>and > commitAsync() APIs explicitly, in addition to > commit(Map<TopicPartition,Long> offsets) and > commitAsync(Map<TopicPartition,Long> offsets), since the no-argument case > would not be covered automatically with Map as the input parameter to the > commit APIs > 4. Offset rewind logic is funky with group management. I took a stab and > it and wrote examples to cover the various offset rewind uses cases I could > think of. I'm not so sure I like it, so I encourage people to take a look > at the examples and provide feedback. This feedback is very critical in > finalizing the consumer APIs as we might have to add/change APIs to make > offset rewind intuitive and easy to use. (Please see the 3rd and 4th > examples > here<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html> > ) > > Once I have feedback on the above, I will go ahead and submit a review > board for the new APIs and javadoc. > > Thanks > Neha > > > On Mon, Mar 24, 2014 at 5:29 PM, Neha Narkhede <neha.narkh...@gmail.com>wrote: > >> Hey Chris, >> >> Really sorry for the late reply, wonder how this fell through the cracks. >> Anyhow, thanks for the great feedback! Here are my comments - >> >> >> 1. Why is the config String->Object instead of String->String? >> >> This is probably more of a feedback about the new config management that >> we adopted in the new clients. I think it is more convenient to write >> configs.put("a", 42); >> instead of >> configs.put("a", Integer.toString(42)); >> >> 2. Are these Java docs correct? >> >> KafkaConsumer(java.util.Map< >> java.lang.String,java.lang.Object> configs) >> A consumer is instantiated by providing a set of key-value pairs as >> configuration and a ConsumerRebalanceCallback implementation >> >> There is no ConsumerRebalanceCallback parameter. >> >> Fixed. >> >> >> 3. Would like to have a method: >> >> poll(long timeout, java.util.concurrent.TimeUnit timeUnit, >> TopicPartition... topicAndPartitionsToPoll) >> >> I see I can effectively do this by just fiddling with subscribe and >> unsubscribe before each poll. Is this a low-overhead operation? Can I just >> unsubscribe from everything after each poll, then re-subscribe to a topic >> the next iteration. I would probably be doing this in a fairly tight loop. >> >> The subscribe and unsubscribe will be very lightweight in-memory >> operations, >> so it shouldn't be a problem to just use those APIs directly. >> Let me know if you think otherwise. >> >> 4. The behavior of AUTO_OFFSET_RESET_CONFIG is overloaded. I think there >> are use cases for decoupling "what to do when no offset exists" from "what >> to do when I'm out of range". I might want to start from smallest the >> first time I run, but fail if I ever get offset out of range. >> >> How about adding a third option "disable" to "auto.offset.reset"? >> What this says is that never automatically reset the offset, either if >> one is not found or if the offset >> falls out of range. Presumably, you would want to turn this off when you >> want to control the offsets >> yourself and use custom rewind/replay logic to reset the consumer's >> offset. In this case, you would >> want to turn this feature off so Kafka does not accidentally reset the >> offset to something else. >> >> I'm not so sure when you would want to make the distinction regarding >> startup and offset falling out >> of range. Presumably, if you don't trust Kafka to reset the offset, then >> you can always turn this off >> and use commit/commitAsync and seek() to set the consumer to the right >> offset on startup and every >> time your consumer falls out of range. >> >> Does that make sense? >> >> 5. ENABLE_JMX could use Java docs, even though it's fairly >> self-explanatory. >> >> Fixed. >> >> 6. Clarity about whether FETCH_BUFFER_CONFIG is per-topic/partition, or >> across all topic/partitions is useful. I believe it's per-topic/partition, >> right? That is, setting to 2 megs with two TopicAndPartitions would result >> in 4 megs worth of data coming in per fetch, right? >> >> Good point, clarified that. Take a look again to see if it makes sense >> now. >> >> 7. What does the consumer do if METADATA_FETCH_TIMEOUT_CONFIG times out? >> Retry, or throw exception? >> >> Throw a TimeoutException. Clarified that in the >> docs<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html> >> . >> >> >> 8. Does RECONNECT_BACKOFF_MS_CONFIG apply to both metadata requests and >> fetch requests? >> >> Applies to all requests. Clarified that in the docs. >> >> 9. What does SESSION_TIMEOUT_MS default to? >> >> Defaults are largely TODO, but session.timeout.ms currently defaults to >> 1000. >> >> 10. Is this consumer thread-safe? >> >> It should be. Updated the >> docs<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html>to >> clarify that. >> >> 11. How do you use a different offset management strategy? Your email >> implies that it's pluggable, but I don't see how. "The offset management >> strategy defaults to Kafka based offset management and the API provides a >> way for the user to use a customized offset store to manage the consumer >> 's >> offsets." >> >> 12. If I wish to decouple the consumer from the offset checkpointing, is >> it OK to use Joel's offset management stuff directly, rather than through >> the consumer's commit API? >> >> For #11 and #12, I updated the >> docs<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html>to >> include actual usage examples. >> Could you take a look and see if answers your questions? >> >> Thanks, >> Neha >> >> >> >> On Mon, Mar 3, 2014 at 10:28 AM, Chris Riccomini <criccom...@linkedin.com >> > wrote: >> >>> Hey Guys, >>> >>> Also, for reference, we'll be looking to implement new Samza consumers >>> which have these APIs: >>> >>> >>> http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/or >>> g/apache/samza/system/SystemConsumer.html<http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/system/SystemConsumer.html> >>> >>> >>> http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/or >>> g/apache/samza/checkpoint/CheckpointManager.html<http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/checkpoint/CheckpointManager.html> >>> >>> >>> Question (3) below is a result of having Samza's SystemConsumers poll >>> allow specific topic/partitions to be specified. >>> >>> The split between consumer and checkpoint manager is the reason for >>> question (12) below. >>> >>> Cheers, >>> Chris >>> >>> On 3/3/14 10:19 AM, "Chris Riccomini" <criccom...@linkedin.com> wrote: >>> >>> >Hey Guys, >>> > >>> >Sorry for the late follow up. Here are my questions/thoughts on the API: >>> > >>> >1. Why is the config String->Object instead of String->String? >>> > >>> >2. Are these Java docs correct? >>> > >>> > KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> >>> configs) >>> > A consumer is instantiated by providing a set of key-value pairs as >>> >configuration and a ConsumerRebalanceCallback implementation >>> > >>> >There is no ConsumerRebalanceCallback parameter. >>> > >>> >3. Would like to have a method: >>> > >>> > poll(long timeout, java.util.concurrent.TimeUnit timeUnit, >>> >TopicPartition... topicAndPartitionsToPoll) >>> > >>> >I see I can effectively do this by just fiddling with subscribe and >>> >unsubscribe before each poll. Is this a low-overhead operation? Can I >>> just >>> >unsubscribe from everything after each poll, then re-subscribe to a >>> topic >>> >the next iteration. I would probably be doing this in a fairly tight >>> loop. >>> > >>> >4. The behavior of AUTO_OFFSET_RESET_CONFIG is overloaded. I think there >>> >are use cases for decoupling "what to do when no offset exists" from >>> "what >>> >to do when I'm out of range". I might want to start from smallest the >>> >first time I run, but fail if I ever get offset out of range. >>> > >>> >5. ENABLE_JMX could use Java docs, even though it's fairly >>> >self-explanatory. >>> > >>> >6. Clarity about whether FETCH_BUFFER_CONFIG is per-topic/partition, or >>> >across all topic/partitions is useful. I believe it's >>> per-topic/partition, >>> >right? That is, setting to 2 megs with two TopicAndPartitions would >>> result >>> >in 4 megs worth of data coming in per fetch, right? >>> > >>> >7. What does the consumer do if METADATA_FETCH_TIMEOUT_CONFIG times out? >>> >Retry, or throw exception? >>> > >>> >8. Does RECONNECT_BACKOFF_MS_CONFIG apply to both metadata requests and >>> >fetch requests? >>> > >>> >9. What does SESSION_TIMEOUT_MS default to? >>> > >>> >10. Is this consumer thread-safe? >>> > >>> >11. How do you use a different offset management strategy? Your email >>> >implies that it's pluggable, but I don't see how. "The offset management >>> >strategy defaults to Kafka based offset management and the API provides >>> a >>> >way for the user to use a customized offset store to manage the >>> consumer's >>> >offsets." >>> > >>> >12. If I wish to decouple the consumer from the offset checkpointing, is >>> >it OK to use Joel's offset management stuff directly, rather than >>> through >>> >the consumer's commit API? >>> > >>> > >>> >Cheers, >>> >Chris >>> > >>> >On 2/10/14 10:54 AM, "Neha Narkhede" <neha.narkh...@gmail.com> wrote: >>> > >>> >>As mentioned in previous emails, we are also working on a >>> >>re-implementation >>> >>of the consumer. I would like to use this email thread to discuss the >>> >>details of the public API. I would also like us to be picky about this >>> >>public api now so it is as good as possible and we don't need to break >>> it >>> >>in the future. >>> >> >>> >>The best way to get a feel for the API is actually to take a look at >>> the >>> >>javadoc< >>> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc >>> >>/ >>> >>doc/kafka/clients/consumer/KafkaConsumer.html>, >>> >>the hope is to get the api docs good enough so that it is >>> >>self-explanatory. >>> >>You can also take a look at the configs >>> >>here< >>> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/do >>> >>c >>> >>/kafka/clients/consumer/ConsumerConfig.html> >>> >> >>> >>Some background info on implementation: >>> >> >>> >>At a high level the primary difference in this consumer is that it >>> >>removes >>> >>the distinction between the "high-level" and "low-level" consumer. The >>> >>new >>> >>consumer API is non blocking and instead of returning a blocking >>> >>iterator, >>> >>the consumer provides a poll() API that returns a list of records. We >>> >>think >>> >>this is better compared to the blocking iterators since it effectively >>> >>decouples the threading strategy used for processing messages from the >>> >>consumer. It is worth noting that the consumer is entirely single >>> >>threaded >>> >>and runs in the user thread. The advantage is that it can be easily >>> >>rewritten in less multi-threading-friendly languages. The consumer >>> >>batches >>> >>data and multiplexes I/O over TCP connections to each of the brokers it >>> >>communicates with, for high throughput. The consumer also allows long >>> >>poll >>> >>to reduce the end-to-end message latency for low throughput data. >>> >> >>> >>The consumer provides a group management facility that supports the >>> >>concept >>> >>of a group with multiple consumer instances (just like the current >>> >>consumer). This is done through a custom heartbeat and group management >>> >>protocol transparent to the user. At the same time, it allows users the >>> >>option to subscribe to a fixed set of partitions and not use group >>> >>management at all. The offset management strategy defaults to Kafka >>> based >>> >>offset management and the API provides a way for the user to use a >>> >>customized offset store to manage the consumer's offsets. >>> >> >>> >>A key difference in this consumer also is the fact that it does not >>> >>depend >>> >>on zookeeper at all. >>> >> >>> >>More details about the new consumer design are >>> >>here< >>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer >>> >>+ >>> >>Rewrite+Design> >>> >> >>> >>Please take a look at the new >>> >>API< >>> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc >>> >>/ >>> >>kafka/clients/consumer/KafkaConsumer.html>and >>> >>give us any thoughts you may have. >>> >> >>> >>Thanks, >>> >>Neha >>> > >>> >>> >> >