Ah, gotcha. -Jay
On Wed, Feb 12, 2014 at 8:40 AM, Neha Narkhede <neha.narkh...@gmail.com>wrote: > Jay > > Well none kind of address the common case which is to commit all > partitions. For these I was thinking just > commit(); > The advantage of this simpler method is that you don't need to bother about > partitions you just consume the messages given to you and then commit them > > This is already what the commit() API is supposed to do. Here is the > javadoc - > > * Synchronously commits the specified offsets for the specified list of > topics and partitions to Kafka. If no partitions are specified, > * commits offsets for the subscribed list of topics and partitions to > Kafka. > > public void commit(TopicPartitionOffset... offsets); > > Could you take another look at the > javadoc< > http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html > >? > I've uploaded changes from the previous discussions and included some of > your review suggestions. > > > > On Wed, Feb 12, 2014 at 8:32 AM, Neha Narkhede <neha.narkh...@gmail.com > >wrote: > > > Imran, > > > > > > Sorry I am probably missing > > something basic, but I'm not sure how a multi-threaded consumer would > > work. I can imagine its either: > > > > a) I just have one thread poll kafka. If I want to process msgs in > > multiple threads, than I deal w/ that after polling, eg. stick them into > a > > blocking queue or something, and have more threads that read from the > > queue. > > > > b) each thread creates its own KafkaConsumer. They are all registered > the > > same way, and I leave it to kafka to figure out what data to give to each > > one. > > > > We designed the new consumer API to not require multi threading on > > purpose. > > The reason this is better than the existing ZookeeperConsumerConnector is > > that > > it effectively allows the user to use whatever threading and load balance > > message > > processing amongst those threads. For example, you might want more > threads > > dedicated > > to a certain high throughput partition compared to other partitions. In > > option a) above, you can > > create your own thread pool and hand over the messages returned by poll > > using a blocking > > queue or any other approach. Option b) would work as well and the user > > has to figure out which > > topics each KafkaConsumer subscribes to. > > > > > > (a) certainly makes things simple, but I worry about throughput -- is > that > > just as good as having one thread trying to consumer each partition? > > > > (b) makes it a bit of a pain to figure out how many threads to use. I > > assume there is no point in using more threads than there are partitions, > > so first you've got to figure out how many partitions there are in each > > topic. Might be nice if there were some util functions to simplify this. > > > > The user can pick the number of threads. That is still better as only the > > user knows how > > slow/fast the message processing of her application is. > > > > Also, since the initial call to subscribe doesn't give the partition > > assignment, does that mean the first call to poll() will always call the > > ConsumerRebalanceCallback? > > > > Assuming you choose to use group management (by using subscribe(topics)), > > poll() will invoke > > the ConsumerRebalanceCallback on every single rebalance attempt. Improved > > the javadoc< > http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerRebalanceCallback.html > >to > > explain that. Could you give that another look? > > > > If I'm on the right track, I'd like to expand this example, showing how > > each "MyConsumer" can keep track of its partitions & offsets, even in the > > face of rebalances. As Jay said, I think a minimal code example could > > really help us see the utility & faults of the api. > > > > Sure, please look at the javadoc< > http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html > >. > > I've tried to include code examples there. Please help in > > improving those or adding more. Looks like we should add some multi > > threading examples. I avoided > > adding those since there are many ways to handling the message processing > > and it will not be feasible > > to list all of those. If we list one, people might think that is the only > > recommended approach. > > > > With that said, here is an example of using Option b) above - > > > > > > List<MyConsumer> consumers = new ArrayList<MyConsumer>(); > > List<String> topics = new ArrayList<String(); > > // populate topics > > assert(consumers.size == topics.size); > > > > for (int i = 0; i < numThreads; i++) { > > MyConsumer c = new MyConsumer(); > > c.subscribe(topics(i)); > > consumers.add(c); > > } > > // poll each consumer in a separate thread. > > for (int i = 0; i < numThreads; i++) { > > executorService.submit(new Runnable() { > > @Override > > public void run() { > > new ProcessMessagesTask(consumers(i)); > > } > > }); > > } > > > > Let me know what you think. > > > > Thanks, > > Neha > > > > On Tue, Feb 11, 2014 at 3:54 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > > > >> Comments inline: > >> > >> > >> On Mon, Feb 10, 2014 at 2:31 PM, Guozhang Wang <wangg...@gmail.com> > >> wrote: > >> > >> > Hello Jay, > >> > > >> > Thanks for the detailed comments. > >> > > >> > 1. Yeah we could discuss a bit more on that. > >> > > >> > 2. Since subscribe() is incremental, adding one topic-partition is OK, > >> and > >> > personally I think it is cleaner than subscribe(String topic, > >> > int...partition)? > >> > > >> I am not too particular. Have you actually tried this? I think writing > >> actual sample code is important. > >> > >> > >> > 3. Originally I was thinking about two interfaces: > >> > > >> > getOffsets() // offsets for all partitions that I am consuming now > >> > > >> > getOffset(topc-partition) // offset of the specified topic-partition, > >> will > >> > throw exception if it is not currently consumed. > >> > > >> > What do you think about these? > >> > > >> > >> The naming needs to distinguish committed offset position versus fetch > >> offset position. Also we aren't using the getX convention. > >> > >> > >> > 4. Yes, that remains a config. > >> > > >> > >> Does that make sense given that you change your position via an api now? > >> > >> > >> > 5. Agree. > >> > > >> > 6. If the time out value is null then it will "logically" return > >> > immediately with whatever data is available. I think an indefinitely > >> poll() > >> > function could be replaced with just > >> > > >> > while (true) poll(some-time)? > >> > > >> > >> That is fine but we should provide a no arg poll for that, poll(null) > >> isn't > >> clear. We should add the timeunit as per the post java 5 convention as > >> that > >> makes the call more readable. E.g. > >> poll(5) vs poll(5, TimeUnit.MILLISECONDS) > >> > >> > >> > 7. I am open with either approach. > >> > > >> > >> Cool. > >> > >> 8. I was thinking about two interfaces for the commit functionality: > >> > > >> > > >> > > >> > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design > >> > > >> > Do those sound better? > >> > > >> > >> Well none kind of address the common case which is to commit all > >> partitions. For these I was thinking just > >> commit(); > >> The advantage of this simpler method is that you don't need to bother > >> about > >> partitions you just consume the messages given to you and then commit > >> them. > >> > >> 9. Currently I think about un-subscribe as "close and re-subscribe", and > >> > would like to hear people's opinion about it. > >> > > >> > >> Hmm, I think it is a little weird if there is a subscribe which can be > >> called at any time but no unsubscribe. Would this be hard to do. > >> > >> > >> > 10. Yes. Position() is an API function, and as and API it means "be > >> called > >> > at any time" and will change the next fetching starting offset. > >> > > >> > >> Cool. > >> > >> > >> > 11. The ConsumerRecord would have the offset info of the message. Is > >> that > >> > what you want? > >> > > >> > >> But that is only after I have gotten a message. I'm not sure if that > >> covers > >> all cases or not. > >> > >> > >> > About use cases: great point. I will add some more examples of using > the > >> > API functions in the wiki pages. > >> > > >> > Guozhang > >> > > >> > > >> > > >> > > >> > On Mon, Feb 10, 2014 at 12:20 PM, Jay Kreps <jay.kr...@gmail.com> > >> wrote: > >> > > >> > > A few items: > >> > > 1. ConsumerRebalanceCallback > >> > > a. onPartitionsRevoked would be a better name. > >> > > b. We should discuss the possibility of splitting this into two > >> > > interfaces. The motivation would be that in Java 8 single method > >> > interfaces > >> > > can directly take methods which might be more intuitive. > >> > > c. If we stick with a single interface I would prefer the name > >> > > RebalanceCallback as its more concise > >> > > 2. Should subscribe(String topic, int partition) should be > >> > subscribe(String > >> > > topic, int...partition)? > >> > > 3. Is lastCommittedOffset call just a local access? If so it would > be > >> > more > >> > > convenient not to batch it. > >> > > 4. How are we going to handle the earliest/latest starting position > >> > > functionality we currently have. Does that remain a config? > >> > > 5. Do we need to expose the general ability to get known positions > >> from > >> > the > >> > > log? E.g. the functionality in the OffsetRequest...? That would make > >> the > >> > > ability to change position a little easier. > >> > > 6. Should poll(java.lang.Long timeout) be poll(long timeout, > TimeUnit > >> > > unit)? Is it Long because it allows null? If so should we just add a > >> > poll() > >> > > that polls indefinitely? > >> > > 7. I recommend we remove the boolean parameter from commit as it is > >> > really > >> > > hard to read code that has boolean parameters without named > arguments. > >> > Can > >> > > we make it something like commit(...) and commitAsync(...)? > >> > > 8. What about the common case where you just want to commit the > >> current > >> > > position for all partitions? > >> > > 9. How do you unsubscribe? > >> > > 10. You say in a few places that positions() only impacts the > starting > >> > > position, but surely that isn't the case, right? Surely it controls > >> the > >> > > fetch position for that partition and can be called at any time? > >> > Otherwise > >> > > it is a pretty weird api, right? > >> > > 11. How do I get my current position? Not the committed position but > >> the > >> > > offset of the next message that will be given to me? > >> > > > >> > > One thing that I really found helpful for the API design was writing > >> out > >> > > actual code for different scenarios against the API. I think it > might > >> be > >> > > good to do that for this too--i.e. enumerate the various use cases > and > >> > code > >> > > that use case up to see how it looks. I'm not sure if it would be > >> useful > >> > to > >> > > collect these kinds of scenarios from people. I know they have > >> > sporadically > >> > > popped up on the mailing list. > >> > > > >> > > -Jay > >> > > > >> > > > >> > > On Mon, Feb 10, 2014 at 10:54 AM, Neha Narkhede < > >> neha.narkh...@gmail.com > >> > > >wrote: > >> > > > >> > > > As mentioned in previous emails, we are also working on a > >> > > re-implementation > >> > > > of the consumer. I would like to use this email thread to discuss > >> the > >> > > > details of the public API. I would also like us to be picky about > >> this > >> > > > public api now so it is as good as possible and we don't need to > >> break > >> > it > >> > > > in the future. > >> > > > > >> > > > The best way to get a feel for the API is actually to take a look > at > >> > the > >> > > > javadoc< > >> > > > > >> > > > >> > > >> > http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html > >> > > > >, > >> > > > the hope is to get the api docs good enough so that it is > >> > > self-explanatory. > >> > > > You can also take a look at the configs > >> > > > here< > >> > > > > >> > > > >> > > >> > http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html > >> > > > > > >> > > > > >> > > > Some background info on implementation: > >> > > > > >> > > > At a high level the primary difference in this consumer is that it > >> > > removes > >> > > > the distinction between the "high-level" and "low-level" consumer. > >> The > >> > > new > >> > > > consumer API is non blocking and instead of returning a blocking > >> > > iterator, > >> > > > the consumer provides a poll() API that returns a list of records. > >> We > >> > > think > >> > > > this is better compared to the blocking iterators since it > >> effectively > >> > > > decouples the threading strategy used for processing messages from > >> the > >> > > > consumer. It is worth noting that the consumer is entirely single > >> > > threaded > >> > > > and runs in the user thread. The advantage is that it can be > easily > >> > > > rewritten in less multi-threading-friendly languages. The consumer > >> > > batches > >> > > > data and multiplexes I/O over TCP connections to each of the > >> brokers it > >> > > > communicates with, for high throughput. The consumer also allows > >> long > >> > > poll > >> > > > to reduce the end-to-end message latency for low throughput data. > >> > > > > >> > > > The consumer provides a group management facility that supports > the > >> > > concept > >> > > > of a group with multiple consumer instances (just like the current > >> > > > consumer). This is done through a custom heartbeat and group > >> management > >> > > > protocol transparent to the user. At the same time, it allows > users > >> the > >> > > > option to subscribe to a fixed set of partitions and not use group > >> > > > management at all. The offset management strategy defaults to > Kafka > >> > based > >> > > > offset management and the API provides a way for the user to use a > >> > > > customized offset store to manage the consumer's offsets. > >> > > > > >> > > > A key difference in this consumer also is the fact that it does > not > >> > > depend > >> > > > on zookeeper at all. > >> > > > > >> > > > More details about the new consumer design are > >> > > > here< > >> > > > > >> > > > >> > > >> > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design > >> > > > > > >> > > > > >> > > > Please take a look at the new > >> > > > API< > >> > > > > >> > > > >> > > >> > http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html > >> > > > >and > >> > > > give us any thoughts you may have. > >> > > > > >> > > > Thanks, > >> > > > Neha > >> > > > > >> > > > >> > > >> > > >> > > >> > -- > >> > -- Guozhang > >> > > >> > > > > >