Jay/Robert - I think what Robert is saying is that we need to think through the offset API to enable "batch processing" of topic data. Think of a process that periodically kicks off to compute a data summary or do a data load or something like that. I think what we need to support this is an api to fetch the last offset from the server for a partition. Something like long lastOffset(TopicPartition tp) and for symmetry long firstOffset(TopicPartition tp)
Likely this would have to be batched. A fixed range of data load can be done using the existing APIs as follows. This assumes you know the endOffset which can be currentOffset + n (number of messages in the load) long startOffset = consumer.position(partition); long endOffset = startOffset + n; while(consumer.position(partition) <= endOffset) { List<ConsumerRecord> messages = consumer.poll(timeout, TimeUnit.MILLISECONDS); process(messages, endOffset); // processes messages until endOffset } Does that make sense? On Tue, Feb 25, 2014 at 9:49 AM, Neha Narkhede <neha.narkh...@gmail.com>wrote: > Thanks for the review, Jun. Here are some comments - > > > 1. The using of ellipsis: This may make passing a list of items from a > collection to the api a bit harder. Suppose that you have a list of topics > stored in > > ArrayList<String> topics; > > If you want subscribe to all topics in one call, you will have to do: > > String[] topicArray = new String[topics.size()]; > consumer.subscribe(topics. > toArray(topicArray)); > > A similar argument can be made for arguably the more common use case of > subscribing to a single topic as well. In these cases, user is required to > write more > code to create a single item collection and pass it in. Since subscription > is extremely lightweight > invoking it multiple times also seems like a workable solution, no? > > 2. It would be good to document that the following apis are mutually > exclusive. Also, if the partition level subscription is specified, there is > no group management. Finally, unsubscribe() can only be used to cancel > subscriptions with the same pattern. For example, you can't unsubscribe at > the partition level if the subscription is done at the topic level. > > *subscribe*(java.lang.String... topics) > *subscribe*(java.lang.String topic, int... partitions) > > Makes sense. Made the suggested improvements to the > docs<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/Consumer.html#subscribe%28java.lang.String...%29> > > > 3.commit(): The following comment in the doc should probably say "commit > offsets for partitions assigned to this consumer". > > If no partitions are specified, commits offsets for the subscribed list of > topics and partitions to Kafka. > > Could you give more context on this suggestion? Here is the entire doc - > > Synchronously commits the specified offsets for the specified list of > topics and partitions to *Kafka*. If no partitions are specified, commits > offsets for the subscribed list of topics and partitions. > > The hope is to convey that if no partitions are specified, offsets will be > committed for the subscribed list of partitions. One improvement could be to > explicitly state that the offsets returned on the last poll will be > committed. I updated this to - > > Synchronously commits the specified offsets for the specified list of > topics and partitions to *Kafka*. If no offsets are specified, commits > offsets returned on the last {@link #poll(long, TimeUnit) poll()} for the > subscribed list of topics and partitions. > > 4. There is inconsistency in specifying partitions. Sometimes we use > TopicPartition and some other times we use String and int (see > examples below). > > void onPartitionsAssigned(Consumer consumer, TopicPartition...partitions) > > public void *subscribe*(java.lang.String topic, int... partitions) > > Yes, this was discussed previously. I think generally the consensus seems > to be to use the higher level > classes everywhere. Made those changes. > > What's the use case of position()? Isn't that just the nextOffset() on the > last message returned from poll()? > > Yes, except in the case where a rebalance is triggered and poll() is not > yet invoked. Here, you would use position() to get the new fetch position > for the specific partition. Even if this is not a common use case, IMO it > is much easier to use position() to get the fetch offset than invoking > nextOffset() on the last message. This also keeps the APIs symmetric, which > is nice. > > > > > On Mon, Feb 24, 2014 at 7:06 PM, Withers, Robert > <robert.with...@dish.com>wrote: > >> That's wonderful. Thanks for kafka. >> >> Rob >> >> On Feb 24, 2014, at 9:58 AM, Guozhang Wang <wangg...@gmail.com<mailto: >> wangg...@gmail.com>> wrote: >> >> Hi Robert, >> >> Yes, you can check out the callback functions in the new API >> >> onPartitionDesigned >> onPartitionAssigned >> >> and see if they meet your needs. >> >> Guozhang >> >> >> On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert <robert.with...@dish.com >> <mailto:robert.with...@dish.com>>wrote: >> >> Jun, >> >> Are you saying it is possible to get events from the high-level consumer >> regarding various state machine changes? For instance, can we get a >> notification when a rebalance starts and ends, when a partition is >> assigned/unassigned, when an offset is committed on a partition, when a >> leader changes and so on? I call this OOB traffic, since they are not the >> core messages streaming, but side-band events, yet they are still >> potentially useful to consumers. >> >> Thank you, >> Robert >> >> >> Robert Withers >> Staff Analyst/Developer >> o: (720) 514-8963 >> c: (571) 262-1873 >> >> >> >> -----Original Message----- >> From: Jun Rao [mailto:jun...@gmail.com] >> Sent: Sunday, February 23, 2014 4:19 PM >> To: users@kafka.apache.org<mailto:users@kafka.apache.org> >> Subject: Re: New Consumer API discussion >> >> Robert, >> >> For the push orient api, you can potentially implement your own >> MessageHandler with those methods. In the main loop of our new consumer >> api, you can just call those methods based on the events you get. >> >> Also, we already have an api to get the first and the last offset of a >> partition (getOffsetBefore). >> >> Thanks, >> >> Jun >> >> >> On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert >> <robert.with...@dish.com<mailto:robert.with...@dish.com>>wrote: >> >> This is a good idea, too. I would modify it to include stream >> marking, then you can have: >> >> long end = consumer.lastOffset(tp); >> consumer.setMark(end); >> while(consumer.beforeMark()) { >> process(consumer.pollToMark()); >> } >> >> or >> >> long end = consumer.lastOffset(tp); >> consumer.setMark(end); >> for(Object msg : consumer.iteratorToMark()) { >> process(msg); >> } >> >> I actually have 4 suggestions, then: >> >> * pull: stream marking >> * pull: finite streams, bound by time range (up-to-now, yesterday) or >> offset >> * pull: async api >> * push: KafkaMessageSource, for a push model, with msg and OOB events. >> Build one in either individual or chunk mode and have a listener for >> each msg or a listener for a chunk of msgs. Make it composable and >> policy driven (chunked, range, commitOffsets policy, retry policy, >> transactional) >> >> Thank you, >> Robert >> >> On Feb 22, 2014, at 11:21 AM, Jay Kreps <jay.kr...@gmail.com<mailto: >> jay.kr...@gmail.com><mailto: >> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>>> wrote: >> >> I think what Robert is saying is that we need to think through the >> offset API to enable "batch processing" of topic data. Think of a >> process that periodically kicks off to compute a data summary or do a >> data load or something like that. I think what we need to support this >> is an api to fetch the last offset from the server for a partition. >> Something like >> long lastOffset(TopicPartition tp) >> and for symmetry >> long firstOffset(TopicPartition tp) >> >> Likely this would have to be batched. Essentially we should add this >> use case to our set of code examples to write and think through. >> >> The usage would be something like >> >> long end = consumer.lastOffset(tp); >> while(consumer.position < end) >> process(consumer.poll()); >> >> -Jay >> >> >> On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert >> <robert.with...@dish.com<mailto:robert.with...@dish.com> >> <mailto:robert.with...@dish.com>>wrote: >> >> Jun, >> >> I was originally thinking a non-blocking read from a distributed >> stream should distinguish between "no local messages, but a fetch is >> occurring" >> versus "you have drained the stream". The reason this may be valuable >> to me is so I can write consumers that read all known traffic then >> terminate. >> You caused me to reconsider and I think I am conflating 2 things. One >> is a sync/async api while the other is whether to have an infinite or >> finite stream. Is it possible to build a finite KafkaStream on a >> range of messages? >> >> Perhaps a Simple Consumer would do just fine and then I could start >> off getting the writeOffset from zookeeper and tell it to read a >> specified range per partition. I've done this and forked a simple >> consumer runnable for each partition, for one of our analyzers. The >> great thing about the high-level consumer is that rebalance, so I can >> fork however many stream readers I want and you just figure it out for >> me. In that way you offer us the control over the resource >> consumption within a pull model. This is best to regulate message >> pressure, they say. >> >> Combining that high-level rebalance ability with a ranged partition >> drain could be really nice...build the stream with an ending position >> and it is a finite stream, but retain the high-level rebalance. With >> a finite stream, you would know the difference of the 2 async >> scenarios: fetch-in-progress versus end-of-stream. With an infinite >> stream, you never get end-of-stream. >> >> Aside from a high-level consumer over a finite range within each >> partition, the other feature I can think of is more complicated. A >> high-level consumer has state machine changes that the client cannot >> access, to my knowledge. Our use of kafka has us invoke a message >> handler with each message we consumer from the KafkaStream, so we >> convert a pull-model to a push-model. Including the idea of receiving >> notifications from state machine changes, what would be really nice is >> to have a KafkaMessageSource, that is an eventful push model. If it >> were thread-safe, then we could register listeners for various events: >> >> * opening-stream >> * closing-stream >> * message-arrived >> * end-of-stream/no-more-messages-in-partition (for finite streams) >> * rebalance started >> * partition assigned >> * partition unassigned >> * rebalance finished >> * partition-offset-committed >> >> Perhaps that is just our use, but instead of a pull-oriented >> KafkaStream, is there any sense in your providing a push-oriented >> KafkaMessageSource publishing OOB messages? >> >> thank you, >> Robert >> >> On Feb 21, 2014, at 5:59 PM, Jun Rao <jun...@gmail.com<mailto: >> jun...@gmail.com><mailto: >> jun...@gmail.com<mailto:jun...@gmail.com>><mailto: >> jun...@gmail.com<mailto:jun...@gmail.com><mailto:jun...@gmail.com>>> >> wrote: >> >> Robert, >> >> Could you explain why you want to distinguish btw >> FetchingInProgressException and NoMessagePendingException? The >> nextMsgs() method that you want is exactly what poll() does. >> >> Thanks, >> >> Jun >> >> >> On Wed, Feb 19, 2014 at 8:45 AM, Withers, Robert >> <robert.with...@dish.com<mailto:robert.with...@dish.com> <mailto: >> robert.with...@dish.com> >> <mailto:robert.with...@dish.com>>wrote: >> >> I am not clear on why the consumer stream should be positionable, >> especially if it is limited to the in-memory fetched messages. Could >> someone explain to me, please? I really like the idea of committing >> the offset specifically on those partitions with changed read offsets, >> only. >> >> >> >> 2 items I would like to see added to the KafkaStream are: >> >> * a non-blocking next(), throws several exceptions >> (FetchingInProgressException and a NoMessagePendingException or >> something) to differentiate between fetching or no messages left. >> >> * A nextMsgs() method which returns all locally available >> messages >> and kicks off a fetch for the next chunk. >> >> >> >> If you are trying to add transactional features, then formally define >> a DTP capability and pull in other server frameworks to share the >> implementation. Should it be XA/Open? How about a new peer2peer DTP >> protocol? >> >> >> >> Thank you, >> >> Robert >> >> >> >> Robert Withers >> >> Staff Analyst/Developer >> >> o: (720) 514-8963 >> >> c: (571) 262-1873 >> >> >> >> -----Original Message----- >> From: Jay Kreps [mailto:jay.kr...@gmail.com] >> Sent: Sunday, February 16, 2014 10:13 AM >> To: users@kafka.apache.org<mailto:users@kafka.apache.org><mailto: >> users@kafka.apache.org><mailto: >> users@kafka.apache.org<mailto:users@kafka.apache.org>> >> Subject: Re: New Consumer API discussion >> >> >> >> +1 I think those are good. It is a little weird that changing the >> +fetch >> >> point is not batched but changing the commit point is, but I suppose >> there is no helping that. >> >> >> >> -Jay >> >> >> >> >> >> On Sat, Feb 15, 2014 at 7:52 AM, Neha Narkhede >> <neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com> <mailto: >> neha.narkh...@gmail.com> >> <mailto:neha.narkh...@gmail.com> >> <mailto:neha.narkh...@gmail.com>>wrote: >> >> >> >> Jay, >> >> >> >> That makes sense. position/seek deal with changing the consumers >> >> in-memory data, so there is no remote rpc there. For some reason, I >> >> got committed and seek mixed up in my head at that time :) >> >> >> >> So we still end up with >> >> >> >> long position(TopicPartition tp) >> >> void seek(TopicPartitionOffset p) >> >> Map<TopicPartition, Long> committed(TopicPartition tp); >> >> void commit(TopicPartitionOffset...); >> >> >> >> Thanks, >> >> Neha >> >> >> >> On Friday, February 14, 2014, Jay Kreps <jay.kr...@gmail.com<mailto: >> jay.kr...@gmail.com><mailto: >> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto: >> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto: >> jay.kr...@gmail.com>><mailto: >> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto: >> jay.kr...@gmail.com><mailto:jay.kreps@gmail >> .com>>> >> wrote: >> >> >> >> Oh, interesting. So I am assuming the following implementation: >> >> 1. We have an in-memory fetch position which controls the next fetch >> >> offset. >> >> 2. Changing this has no effect until you poll again at which point >> >> your fetch request will be from the newly specified offset 3. We >> >> then have an in-memory but also remotely stored committed offset. >> >> 4. Calling commit has the effect of saving the fetch position as >> >> both the in memory committed position and in the remote store 5. >> >> Auto-commit is the same as periodically calling commit on all >> >> positions. >> >> >> >> So batching on commit as well as getting the committed position >> >> makes sense, but batching the fetch position wouldn't, right? I >> >> think you are actually thinking of a different approach. >> >> >> >> -Jay >> >> >> >> >> >> On Thu, Feb 13, 2014 at 10:40 PM, Neha Narkhede >> >> <neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto: >> neha.narkh...@gmail.com><mailto: >> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>> >> >> <javascript:;> >> >> wrote: >> >> >> >> I think you are saying both, i.e. if you have committed on a >> >> partition it returns you that value but if you >> >> haven't >> >> it does a remote lookup? >> >> >> >> Correct. >> >> >> >> The other argument for making committed batched is that commit() >> >> is batched, so there is symmetry. >> >> >> >> position() and seek() are always in memory changes (I assume) so >> >> there >> >> is >> >> no need to batch them. >> >> >> >> I'm not as sure as you are about that assumption being true. >> >> Basically >> >> in >> >> my example above, the batching argument for committed() also >> >> applies to >> >> position() since one purpose of fetching a partition's offset is >> >> to use >> >> it >> >> to set the position of the consumer to that offset. Since that >> >> might >> >> lead >> >> to a remote OffsetRequest call, I think we probably would be >> >> better off batching it. >> >> >> >> Another option for naming would be position/reposition instead of >> >> position/seek. >> >> >> >> I think position/seek is better since it aligns with Java file APIs. >> >> >> >> I also think your suggestion about ConsumerPosition makes sense. >> >> >> >> Thanks, >> >> Neha >> >> On Feb 13, 2014 9:22 PM, "Jay Kreps" <jay.kr...@gmail.com<mailto: >> jay.kr...@gmail.com><mailto: >> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto: >> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto: >> jay.kr...@gmail.com>><mailto: >> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto: >> jay.kr...@gmail.com><mailto:jay.kreps@gmail >> .com>>> >> wrote: >> >> >> >> Hey Neha, >> >> >> >> I actually wasn't proposing the name TopicOffsetPosition, that >> >> was >> >> just a >> >> typo. I meant TopicPartitionOffset, and I was just referencing >> >> what >> >> was >> >> in >> >> the javadoc. So to restate my proposal without the typo, using >> >> just >> >> the >> >> existing classes (that naming is a separate question): >> >> long position(TopicPartition tp) >> >> void seek(TopicPartitionOffset p) >> >> long committed(TopicPartition tp) >> >> void commit(TopicPartitionOffset...); >> >> >> >> So I may be unclear on committed() (AKA lastCommittedOffset). Is >> >> it returning the in-memory value from the last commit by this >> >> consumer, >> >> or >> >> is >> >> it doing a remote fetch, or both? I think you are saying both, i.e. >> >> if >> >> you >> >> have committed on a partition it returns you that value but if >> >> you >> >> haven't >> >> it does a remote lookup? >> >> >> >> The other argument for making committed batched is that commit() >> >> is batched, so there is symmetry. >> >> >> >> position() and seek() are always in memory changes (I assume) so >> >> there >> >> is >> >> no need to batch them. >> >> >> >> So taking all that into account what if we revise it to >> >> long position(TopicPartition tp) >> >> void seek(TopicPartitionOffset p) >> >> Map<TopicPartition, Long> committed(TopicPartition tp); >> >> void commit(TopicPartitionOffset...); >> >> >> >> This is not symmetric between position/seek and commit/committed >> >> but >> >> it >> >> is >> >> convenient. Another option for naming would be >> >> position/reposition >> >> instead >> >> of position/seek. >> >> >> >> With respect to the name TopicPartitionOffset, what I was trying >> >> to >> >> say >> >> is >> >> that I recommend we change that to something shorter. I think >> >> TopicPosition >> >> or ConsumerPosition might be better. Position does not refer to >> >> the variables in the object, it refers to the meaning of the >> >> object--it represents a position within a topic. The offset >> >> field in that object >> >> is >> >> still called the offset. TopicOffset, PartitionOffset, or >> >> ConsumerOffset >> >> would all be workable too. Basically I am just objecting to >> >> concatenating >> >> three nouns together. :-) >> >> >> >> -Jay >> >> >> >> >> >> >> >> >> >> >> >> On Thu, Feb 13, 2014 at 1:54 PM, Neha Narkhede < >> >> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto: >> neha.narkh...@gmail.com><mailto: >> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>><mailto: >> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto: >> neha.narkh...@gmail.com>> >> >> wrote: >> >> >> >> 2. It returns a list of results. But how can you use the list? >> >> The >> >> only >> >> way >> >> to use the list is to make a map of tp=>offset and then look >> >> up >> >> results >> >> in >> >> this map (or do a for loop over the list for the partition you >> >> want). I >> >> recommend that if this is an in-memory check we just do one at >> >> a >> >> time. >> >> E.g. >> >> long committedPosition( >> >> TopicPosition). >> >> >> >> This was discussed in the previous emails. There is a choic >> >> >> >> >> -- >> Robert Withers >> robert.with...@dish.com<mailto:robert.with...@dish.com><mailto: >> robert.with...@dish.com><mailto: >> robert.with...@dish.com<mailto:robert.with...@dish.com>> >> c: 303.919.5856 >> >> >> >> -- >> Robert Withers >> robert.with...@dish.com<mailto:robert.with...@dish.com><mailto: >> robert.with...@dish.com> >> c: 303.919.5856 >> >> >> >> >> >> >> -- >> -- Guozhang >> >> >> >> -- >> Robert Withers >> robert.with...@dish.com<mailto:robert.with...@dish.com> >> c: 303.919.5856 >> >> >