Updated thoughts. 1.
subscribe(String topic, int... paritions) and unsubscribe(String topic, int... partitions) should be subscribe(TopicPartition... topicPartitions)and unsubscribe(TopicPartition... topicPartitons) 2. Does it make sense to provide a convenience method to subscribe to topics at a particular offset directly? E.g. subscribe(TopicPartitionOffset... offsets) 3. The javadoc makes no mention of what would happen if positions() is called with a TopicPartitionOffset to which the Consumer is not subscribed to. 4. The javadoc makes no mention of what would happen if positions() is called with two different offsets for a single TopicPartition 5. The javadoc shows lastCommittedOffsets() return type as List<TopicPartitionOffset>. This should either be Map<TopicPartition, Long> or Map<TopicPartition, TopicPartitionOffset> 6. It seems like #4 can be avoided by using Map<TopicPartition, Long> or Map<TopicPartition, TopicPartitionOffset> as the argument type. 7. To address #3, maybe we can return List<TopicPartitionOffset> that are invalid. On Tue, Feb 11, 2014 at 12:04 PM, Neha Narkhede <neha.narkh...@gmail.com>wrote: > Pradeep, > > To be clear, we want to get feedback on the APIs from the > javadoc< > http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html > >since > the wiki will be slightly behind on the APIs. > > 1. Regarding consistency, do you have specific feedback on which APIs > should have different arguments/return types? > 2. lastCommittedOffsets() does what you said in the javadoc. > > Thanks, > Neha > > > On Tue, Feb 11, 2014 at 11:45 AM, Pradeep Gollakota <pradeep...@gmail.com > >wrote: > > > Hi Jay, > > > > I apologize for derailing the conversation about the consumer API. We > > should start a new discussion about hierarchical topics, if we want to > keep > > talking about it. My final thought on the matter is that, hierarchical > > topics is still an important feature to have in Kafka, because it gives > us > > flexibility to do namespace level access controls. > > > > Getting back to the topic of the Consumer API: > > > > 1. Any thoughts on consistency for method arguments and return types? > > 2. lastCommittedOffsets() method returns a > > List<TopicPartitionOffset>where as the confluence page suggested a > > Map<TopicPartition, > > Long>. I would think that a Map is the more appropriate return type. > > > > > > > > On Tue, Feb 11, 2014 at 8:04 AM, Jay Kreps <jay.kr...@gmail.com> wrote: > > > > > Hey Pradeep, > > > > > > That wiki is fairly old and it predated more flexible subscription > > > mechanisms. In the high-level consumer you currently have wildcard > > > subscription and in the new proposed interface you can actually > subscribe > > > based on any logic you want to create a "union" of streams. Personally > I > > > think this gives you everything you would want with a hierarchy and > more > > > actual flexibility (since you can define groupings however you want). > > What > > > do you think? > > > > > > -Jay > > > > > > > > > On Mon, Feb 10, 2014 at 3:37 PM, Pradeep Gollakota < > pradeep...@gmail.com > > > >wrote: > > > > > > > WRT to hierarchical topics, I'm referring to > > > > KAFKA-1175<https://issues.apache.org/jira/browse/KAFKA-1175>. > > > > I would just like to think through the implications for the Consumer > > API > > > if > > > > and when we do implement hierarchical topics. For example, in the > > > > proposal< > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Hierarchical+Topics# > > > > >written > > > > by Jay, he says that initially wildcard subscriptions are not going > > > > to be supported. But does that mean that they will be supported in > v2? > > If > > > > that's the case, that would change the semantics of the Consumer API. > > > > > > > > As to having classes for Topic, PartitionId, etc. it looks like I was > > > > referring to the TopicPartition and TopicPartitionOffset classes (I > > > didn't > > > > realize these were already there). I was only looking at the > confluence > > > > page which shows List[(String, Int, Long)] instead of > > > > List[TopicParitionOffset] (as is shown in the javadoc). However, I > did > > > > notice that we're not being consistent in the Java version. E.g. we > > have > > > > commit(TopicPartitionOffset... offsets) and > > > > lastCommittedOffsets(TopicPartition... partitions) on the one hand. > On > > > the > > > > other hand we have subscribe(String topic, int... partitions). I > agree > > > that > > > > creating a class for TopicId today would probably not make too much > > sense > > > > today. But with hierarchical topics, I may change my mind. This is > > > exactly > > > > what was done in the HBase API in 0.96 when namespaces were added. > 0.96 > > > > HBase API introduced a class called 'TableName' to represent the > > > namespace > > > > and table name. > > > > > > > > > > > > On Mon, Feb 10, 2014 at 3:08 PM, Neha Narkhede < > > neha.narkh...@gmail.com > > > > >wrote: > > > > > > > > > Thanks for the feedback. > > > > > > > > > > Mattijs - > > > > > > > > > > - Constructors link to > > > > > http://kafka.apache.org/documentation.html#consumerconfigs for > valid > > > > > configurations, which lists zookeeper.connect rather than > > > > > metadata.broker.list, the value for BROKER_LIST_CONFIG in > > > ConsumerConfig. > > > > > Fixed it to just point to ConsumerConfig for now until we finalize > > the > > > > new > > > > > configs > > > > > - Docs for poll(long) mention consumer.commit(true), which I can't > > find > > > > in > > > > > the Consumer docs. For a simple consumer setup, that call is > > something > > > > that > > > > > would make a lot of sense. > > > > > Missed changing the examples to use consumer.commit(true, offsets). > > The > > > > > suggestions by Jay would change it to commit(offsets) and > > > > > commitAsync(offsets), which will hopefully make it easier to > > understand > > > > > those commit APIs. > > > > > - Love the addition of MockConsumer, awesome for unittesting :) > > > > > I'm not quite satisfied with what it does as of right now, but we > > will > > > > > surely improve it as we start writing the consumer. > > > > > > > > > > Jay - > > > > > > > > > > 1. ConsumerRebalanceCallback > > > > > a. Makes sense. Renamed to onPartitionsRevoked > > > > > b. Ya, it will be good to make it forward compatible with Java > 8 > > > > > capabilities. We can change it to PartitionsAssignedCallback and > > > > > PartitionsRevokedCallback or RebalanceBeginCallback and > > > > > RebalanceEndCallback? > > > > > c. Ya, I thought about that but then didn't name it just > > > > > RebalanceCallback since there could be a conflict with a controller > > > side > > > > > rebalance callback if/when we have one. However, you can argue that > > at > > > > that > > > > > time we can name it ControllerRebalanceCallback instead of > polluting > > a > > > > user > > > > > facing API. So agree with you here. > > > > > 2. Ya, that is a good idea. Changed to subscribe(String topic, > > > > > int...partitions). > > > > > 3. lastCommittedOffset() is not necessarily a local access since > the > > > > > consumer can potentially ask for the last committed offsets of > > > partitions > > > > > that the consumer does not consume and maintain the offsets for. > > That's > > > > the > > > > > reason it is batched right now. > > > > > 4. Yes, look at > > > > > > > > > > > > > > > > > > > > http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html#AUTO_OFFSET_RESET_CONFIG > > > > > 5. Sure, but that is not part of the consumer API right? I think > > you're > > > > > suggesting looking at OffsetRequest to see if it would do that > > > properly? > > > > > 6. Good point. Changed to poll(long timeout, TimeUnit) and poll > with > > a > > > > > negative timeout will poll indefinitely? > > > > > 7. Good point. Changed to commit(...) and commitAsync(...) > > > > > 8. To commit the current position for all partitions owned by the > > > > consumer, > > > > > you can use commit(). If you don't use group management, then > > > > > commit(customListOfPartitions) > > > > > 9. Forgot to include unsubscribe. Done now. > > > > > 10. positions() can be called at any time and affects the next > fetch > > on > > > > the > > > > > next poll(). Fixed the places that said "starting fetch offsets" > > > > > 11. Can we not look that up by going through the messages returned > > and > > > > > getting the offset from the ConsumerRecord? > > > > > > > > > > One thing that I really found helpful for the API design was > writing > > > out > > > > > actual code for different scenarios against the API. I think it > might > > > be > > > > > good to do that for this too--i.e. enumerate the various use cases > > and > > > > code > > > > > that use case up to see how it looks > > > > > The javadocs include examples for almost all possible scenarios of > > > > consumer > > > > > usage, that I could come up with. Will add more to the javadocs as > I > > > get > > > > > more feedback from our users. The advantage of having the examples > in > > > the > > > > > javadoc itself is to that the usage is self explanatory to new > users. > > > > > > > > > > Pradeep - > > > > > > > > > > 2. Changed to poll(long, TimeUnit) and a negative value for the > > timeout > > > > > would block in the poll forever until there is new data > > > > > 3. We don't have hierarchical topics support. Would you mind > > explaining > > > > > what you meant? > > > > > 4. I'm not so sure that we need a class to express a topic which > is a > > > > > string and a separate class for just partition id. We do have a > class > > > for > > > > > TopicPartition which uniquely identifies a partition of a topic > > > > > > > > > > Thanks, > > > > > Neha > > > > > > > > > > > > > > > On Mon, Feb 10, 2014 at 12:36 PM, Pradeep Gollakota < > > > > pradeep...@gmail.com > > > > > >wrote: > > > > > > > > > > > Couple of very quick thoughts. > > > > > > > > > > > > 1. +1 about renaming commit(...) and commitAsync(...) > > > > > > 2. I'd also like to extend the above for the poll() method as > > well. > > > > > poll() > > > > > > and pollWithTimeout(long, TimeUnit)? > > > > > > 3. Have you guys given any thought around how this API would be > > used > > > > with > > > > > > hierarchical topics? > > > > > > 4. Would it make sense to add classes such as TopicId, > PartitionId, > > > > etc? > > > > > > Seems like it would be easier to read code with these classes as > > > > opposed > > > > > to > > > > > > string and longs. > > > > > > > > > > > > - Pradeep > > > > > > > > > > > > > > > > > > On Mon, Feb 10, 2014 at 12:20 PM, Jay Kreps <jay.kr...@gmail.com > > > > > > wrote: > > > > > > > > > > > > > A few items: > > > > > > > 1. ConsumerRebalanceCallback > > > > > > > a. onPartitionsRevoked would be a better name. > > > > > > > b. We should discuss the possibility of splitting this into > > two > > > > > > > interfaces. The motivation would be that in Java 8 single > method > > > > > > interfaces > > > > > > > can directly take methods which might be more intuitive. > > > > > > > c. If we stick with a single interface I would prefer the > name > > > > > > > RebalanceCallback as its more concise > > > > > > > 2. Should subscribe(String topic, int partition) should be > > > > > > subscribe(String > > > > > > > topic, int...partition)? > > > > > > > 3. Is lastCommittedOffset call just a local access? If so it > > would > > > be > > > > > > more > > > > > > > convenient not to batch it. > > > > > > > 4. How are we going to handle the earliest/latest starting > > position > > > > > > > functionality we currently have. Does that remain a config? > > > > > > > 5. Do we need to expose the general ability to get known > > positions > > > > from > > > > > > the > > > > > > > log? E.g. the functionality in the OffsetRequest...? That would > > > make > > > > > the > > > > > > > ability to change position a little easier. > > > > > > > 6. Should poll(java.lang.Long timeout) be poll(long timeout, > > > TimeUnit > > > > > > > unit)? Is it Long because it allows null? If so should we just > > add > > > a > > > > > > poll() > > > > > > > that polls indefinitely? > > > > > > > 7. I recommend we remove the boolean parameter from commit as > it > > is > > > > > > really > > > > > > > hard to read code that has boolean parameters without named > > > > arguments. > > > > > > Can > > > > > > > we make it something like commit(...) and commitAsync(...)? > > > > > > > 8. What about the common case where you just want to commit the > > > > current > > > > > > > position for all partitions? > > > > > > > 9. How do you unsubscribe? > > > > > > > 10. You say in a few places that positions() only impacts the > > > > starting > > > > > > > position, but surely that isn't the case, right? Surely it > > controls > > > > the > > > > > > > fetch position for that partition and can be called at any > time? > > > > > > Otherwise > > > > > > > it is a pretty weird api, right? > > > > > > > 11. How do I get my current position? Not the committed > position > > > but > > > > > the > > > > > > > offset of the next message that will be given to me? > > > > > > > > > > > > > > One thing that I really found helpful for the API design was > > > writing > > > > > out > > > > > > > actual code for different scenarios against the API. I think it > > > might > > > > > be > > > > > > > good to do that for this too--i.e. enumerate the various use > > cases > > > > and > > > > > > code > > > > > > > that use case up to see how it looks. I'm not sure if it would > be > > > > > useful > > > > > > to > > > > > > > collect these kinds of scenarios from people. I know they have > > > > > > sporadically > > > > > > > popped up on the mailing list. > > > > > > > > > > > > > > -Jay > > > > > > > > > > > > > > > > > > > > > On Mon, Feb 10, 2014 at 10:54 AM, Neha Narkhede < > > > > > neha.narkh...@gmail.com > > > > > > > >wrote: > > > > > > > > > > > > > > > As mentioned in previous emails, we are also working on a > > > > > > > re-implementation > > > > > > > > of the consumer. I would like to use this email thread to > > discuss > > > > the > > > > > > > > details of the public API. I would also like us to be picky > > about > > > > > this > > > > > > > > public api now so it is as good as possible and we don't need > > to > > > > > break > > > > > > it > > > > > > > > in the future. > > > > > > > > > > > > > > > > The best way to get a feel for the API is actually to take a > > look > > > > at > > > > > > the > > > > > > > > javadoc< > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html > > > > > > > > >, > > > > > > > > the hope is to get the api docs good enough so that it is > > > > > > > self-explanatory. > > > > > > > > You can also take a look at the configs > > > > > > > > here< > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html > > > > > > > > > > > > > > > > > > > > > > > > > Some background info on implementation: > > > > > > > > > > > > > > > > At a high level the primary difference in this consumer is > that > > > it > > > > > > > removes > > > > > > > > the distinction between the "high-level" and "low-level" > > > consumer. > > > > > The > > > > > > > new > > > > > > > > consumer API is non blocking and instead of returning a > > blocking > > > > > > > iterator, > > > > > > > > the consumer provides a poll() API that returns a list of > > > records. > > > > We > > > > > > > think > > > > > > > > this is better compared to the blocking iterators since it > > > > > effectively > > > > > > > > decouples the threading strategy used for processing messages > > > from > > > > > the > > > > > > > > consumer. It is worth noting that the consumer is entirely > > single > > > > > > > threaded > > > > > > > > and runs in the user thread. The advantage is that it can be > > > easily > > > > > > > > rewritten in less multi-threading-friendly languages. The > > > consumer > > > > > > > batches > > > > > > > > data and multiplexes I/O over TCP connections to each of the > > > > brokers > > > > > it > > > > > > > > communicates with, for high throughput. The consumer also > > allows > > > > long > > > > > > > poll > > > > > > > > to reduce the end-to-end message latency for low throughput > > data. > > > > > > > > > > > > > > > > The consumer provides a group management facility that > supports > > > the > > > > > > > concept > > > > > > > > of a group with multiple consumer instances (just like the > > > current > > > > > > > > consumer). This is done through a custom heartbeat and group > > > > > management > > > > > > > > protocol transparent to the user. At the same time, it allows > > > users > > > > > the > > > > > > > > option to subscribe to a fixed set of partitions and not use > > > group > > > > > > > > management at all. The offset management strategy defaults to > > > Kafka > > > > > > based > > > > > > > > offset management and the API provides a way for the user to > > use > > > a > > > > > > > > customized offset store to manage the consumer's offsets. > > > > > > > > > > > > > > > > A key difference in this consumer also is the fact that it > does > > > not > > > > > > > depend > > > > > > > > on zookeeper at all. > > > > > > > > > > > > > > > > More details about the new consumer design are > > > > > > > > here< > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design > > > > > > > > > > > > > > > > > > > > > > > > > Please take a look at the new > > > > > > > > API< > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html > > > > > > > > >and > > > > > > > > give us any thoughts you may have. > > > > > > > > > > > > > > > > Thanks, > > > > > > > > Neha > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >