Hey Neha, How do you know n? The whole point is that you need to be able to fetch the end offset. You can't a priori decide you will load 1m messages without knowing what is there.
-Jay On Tue, Feb 25, 2014 at 9:54 AM, Neha Narkhede <neha.narkh...@gmail.com>wrote: > Jay/Robert - > > I think what Robert is saying is that we need to think through the offset > API to enable "batch processing" of topic data. Think of a process that > periodically kicks off to compute a data summary or do a data load or > something like that. I think what we need to support this is an api to > fetch the last offset from the server for a partition. Something like > long lastOffset(TopicPartition tp) > and for symmetry > long firstOffset(TopicPartition tp) > > Likely this would have to be batched. > > A fixed range of data load can be done using the existing APIs as follows. > This assumes you know the endOffset which can be currentOffset + n (number > of messages in the load) > > long startOffset = consumer.position(partition); > long endOffset = startOffset + n; > while(consumer.position(partition) <= endOffset) { > List<ConsumerRecord> messages = consumer.poll(timeout, > TimeUnit.MILLISECONDS); > process(messages, endOffset); // processes messages until > endOffset > } > > Does that make sense? > > > On Tue, Feb 25, 2014 at 9:49 AM, Neha Narkhede <neha.narkh...@gmail.com > >wrote: > > > Thanks for the review, Jun. Here are some comments - > > > > > > 1. The using of ellipsis: This may make passing a list of items from a > > collection to the api a bit harder. Suppose that you have a list of > topics > > stored in > > > > ArrayList<String> topics; > > > > If you want subscribe to all topics in one call, you will have to do: > > > > String[] topicArray = new String[topics.size()]; > > consumer.subscribe(topics. > > toArray(topicArray)); > > > > A similar argument can be made for arguably the more common use case of > > subscribing to a single topic as well. In these cases, user is required > to > > write more > > code to create a single item collection and pass it in. Since > subscription > > is extremely lightweight > > invoking it multiple times also seems like a workable solution, no? > > > > 2. It would be good to document that the following apis are mutually > > exclusive. Also, if the partition level subscription is specified, there > is > > no group management. Finally, unsubscribe() can only be used to cancel > > subscriptions with the same pattern. For example, you can't unsubscribe > at > > the partition level if the subscription is done at the topic level. > > > > *subscribe*(java.lang.String... topics) > > *subscribe*(java.lang.String topic, int... partitions) > > > > Makes sense. Made the suggested improvements to the docs< > http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/Consumer.html#subscribe%28java.lang.String...%29 > > > > > > > > 3.commit(): The following comment in the doc should probably say "commit > > offsets for partitions assigned to this consumer". > > > > If no partitions are specified, commits offsets for the subscribed list > of > > topics and partitions to Kafka. > > > > Could you give more context on this suggestion? Here is the entire doc - > > > > Synchronously commits the specified offsets for the specified list of > > topics and partitions to *Kafka*. If no partitions are specified, commits > > offsets for the subscribed list of topics and partitions. > > > > The hope is to convey that if no partitions are specified, offsets will > be > > committed for the subscribed list of partitions. One improvement could > be to > > explicitly state that the offsets returned on the last poll will be > > committed. I updated this to - > > > > Synchronously commits the specified offsets for the specified list of > > topics and partitions to *Kafka*. If no offsets are specified, commits > > offsets returned on the last {@link #poll(long, TimeUnit) poll()} for the > > subscribed list of topics and partitions. > > > > 4. There is inconsistency in specifying partitions. Sometimes we use > > TopicPartition and some other times we use String and int (see > > examples below). > > > > void onPartitionsAssigned(Consumer consumer, TopicPartition...partitions) > > > > public void *subscribe*(java.lang.String topic, int... partitions) > > > > Yes, this was discussed previously. I think generally the consensus seems > > to be to use the higher level > > classes everywhere. Made those changes. > > > > What's the use case of position()? Isn't that just the nextOffset() on > the > > last message returned from poll()? > > > > Yes, except in the case where a rebalance is triggered and poll() is not > > yet invoked. Here, you would use position() to get the new fetch position > > for the specific partition. Even if this is not a common use case, IMO it > > is much easier to use position() to get the fetch offset than invoking > > nextOffset() on the last message. This also keeps the APIs symmetric, > which > > is nice. > > > > > > > > > > On Mon, Feb 24, 2014 at 7:06 PM, Withers, Robert < > robert.with...@dish.com>wrote: > > > >> That's wonderful. Thanks for kafka. > >> > >> Rob > >> > >> On Feb 24, 2014, at 9:58 AM, Guozhang Wang <wangg...@gmail.com<mailto: > >> wangg...@gmail.com>> wrote: > >> > >> Hi Robert, > >> > >> Yes, you can check out the callback functions in the new API > >> > >> onPartitionDesigned > >> onPartitionAssigned > >> > >> and see if they meet your needs. > >> > >> Guozhang > >> > >> > >> On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert < > robert.with...@dish.com > >> <mailto:robert.with...@dish.com>>wrote: > >> > >> Jun, > >> > >> Are you saying it is possible to get events from the high-level consumer > >> regarding various state machine changes? For instance, can we get a > >> notification when a rebalance starts and ends, when a partition is > >> assigned/unassigned, when an offset is committed on a partition, when a > >> leader changes and so on? I call this OOB traffic, since they are not > the > >> core messages streaming, but side-band events, yet they are still > >> potentially useful to consumers. > >> > >> Thank you, > >> Robert > >> > >> > >> Robert Withers > >> Staff Analyst/Developer > >> o: (720) 514-8963 > >> c: (571) 262-1873 > >> > >> > >> > >> -----Original Message----- > >> From: Jun Rao [mailto:jun...@gmail.com] > >> Sent: Sunday, February 23, 2014 4:19 PM > >> To: users@kafka.apache.org<mailto:users@kafka.apache.org> > >> Subject: Re: New Consumer API discussion > >> > >> Robert, > >> > >> For the push orient api, you can potentially implement your own > >> MessageHandler with those methods. In the main loop of our new consumer > >> api, you can just call those methods based on the events you get. > >> > >> Also, we already have an api to get the first and the last offset of a > >> partition (getOffsetBefore). > >> > >> Thanks, > >> > >> Jun > >> > >> > >> On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert > >> <robert.with...@dish.com<mailto:robert.with...@dish.com>>wrote: > >> > >> This is a good idea, too. I would modify it to include stream > >> marking, then you can have: > >> > >> long end = consumer.lastOffset(tp); > >> consumer.setMark(end); > >> while(consumer.beforeMark()) { > >> process(consumer.pollToMark()); > >> } > >> > >> or > >> > >> long end = consumer.lastOffset(tp); > >> consumer.setMark(end); > >> for(Object msg : consumer.iteratorToMark()) { > >> process(msg); > >> } > >> > >> I actually have 4 suggestions, then: > >> > >> * pull: stream marking > >> * pull: finite streams, bound by time range (up-to-now, yesterday) or > >> offset > >> * pull: async api > >> * push: KafkaMessageSource, for a push model, with msg and OOB events. > >> Build one in either individual or chunk mode and have a listener for > >> each msg or a listener for a chunk of msgs. Make it composable and > >> policy driven (chunked, range, commitOffsets policy, retry policy, > >> transactional) > >> > >> Thank you, > >> Robert > >> > >> On Feb 22, 2014, at 11:21 AM, Jay Kreps <jay.kr...@gmail.com<mailto: > >> jay.kr...@gmail.com><mailto: > >> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>>> wrote: > >> > >> I think what Robert is saying is that we need to think through the > >> offset API to enable "batch processing" of topic data. Think of a > >> process that periodically kicks off to compute a data summary or do a > >> data load or something like that. I think what we need to support this > >> is an api to fetch the last offset from the server for a partition. > >> Something like > >> long lastOffset(TopicPartition tp) > >> and for symmetry > >> long firstOffset(TopicPartition tp) > >> > >> Likely this would have to be batched. Essentially we should add this > >> use case to our set of code examples to write and think through. > >> > >> The usage would be something like > >> > >> long end = consumer.lastOffset(tp); > >> while(consumer.position < end) > >> process(consumer.poll()); > >> > >> -Jay > >> > >> > >> On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert > >> <robert.with...@dish.com<mailto:robert.with...@dish.com> > >> <mailto:robert.with...@dish.com>>wrote: > >> > >> Jun, > >> > >> I was originally thinking a non-blocking read from a distributed > >> stream should distinguish between "no local messages, but a fetch is > >> occurring" > >> versus "you have drained the stream". The reason this may be valuable > >> to me is so I can write consumers that read all known traffic then > >> terminate. > >> You caused me to reconsider and I think I am conflating 2 things. One > >> is a sync/async api while the other is whether to have an infinite or > >> finite stream. Is it possible to build a finite KafkaStream on a > >> range of messages? > >> > >> Perhaps a Simple Consumer would do just fine and then I could start > >> off getting the writeOffset from zookeeper and tell it to read a > >> specified range per partition. I've done this and forked a simple > >> consumer runnable for each partition, for one of our analyzers. The > >> great thing about the high-level consumer is that rebalance, so I can > >> fork however many stream readers I want and you just figure it out for > >> me. In that way you offer us the control over the resource > >> consumption within a pull model. This is best to regulate message > >> pressure, they say. > >> > >> Combining that high-level rebalance ability with a ranged partition > >> drain could be really nice...build the stream with an ending position > >> and it is a finite stream, but retain the high-level rebalance. With > >> a finite stream, you would know the difference of the 2 async > >> scenarios: fetch-in-progress versus end-of-stream. With an infinite > >> stream, you never get end-of-stream. > >> > >> Aside from a high-level consumer over a finite range within each > >> partition, the other feature I can think of is more complicated. A > >> high-level consumer has state machine changes that the client cannot > >> access, to my knowledge. Our use of kafka has us invoke a message > >> handler with each message we consumer from the KafkaStream, so we > >> convert a pull-model to a push-model. Including the idea of receiving > >> notifications from state machine changes, what would be really nice is > >> to have a KafkaMessageSource, that is an eventful push model. If it > >> were thread-safe, then we could register listeners for various events: > >> > >> * opening-stream > >> * closing-stream > >> * message-arrived > >> * end-of-stream/no-more-messages-in-partition (for finite streams) > >> * rebalance started > >> * partition assigned > >> * partition unassigned > >> * rebalance finished > >> * partition-offset-committed > >> > >> Perhaps that is just our use, but instead of a pull-oriented > >> KafkaStream, is there any sense in your providing a push-oriented > >> KafkaMessageSource publishing OOB messages? > >> > >> thank you, > >> Robert > >> > >> On Feb 21, 2014, at 5:59 PM, Jun Rao <jun...@gmail.com<mailto: > >> jun...@gmail.com><mailto: > >> jun...@gmail.com<mailto:jun...@gmail.com>><mailto: > >> jun...@gmail.com<mailto:jun...@gmail.com><mailto:jun...@gmail.com>>> > >> wrote: > >> > >> Robert, > >> > >> Could you explain why you want to distinguish btw > >> FetchingInProgressException and NoMessagePendingException? The > >> nextMsgs() method that you want is exactly what poll() does. > >> > >> Thanks, > >> > >> Jun > >> > >> > >> On Wed, Feb 19, 2014 at 8:45 AM, Withers, Robert > >> <robert.with...@dish.com<mailto:robert.with...@dish.com> <mailto: > >> robert.with...@dish.com> > >> <mailto:robert.with...@dish.com>>wrote: > >> > >> I am not clear on why the consumer stream should be positionable, > >> especially if it is limited to the in-memory fetched messages. Could > >> someone explain to me, please? I really like the idea of committing > >> the offset specifically on those partitions with changed read offsets, > >> only. > >> > >> > >> > >> 2 items I would like to see added to the KafkaStream are: > >> > >> * a non-blocking next(), throws several exceptions > >> (FetchingInProgressException and a NoMessagePendingException or > >> something) to differentiate between fetching or no messages left. > >> > >> * A nextMsgs() method which returns all locally available > >> messages > >> and kicks off a fetch for the next chunk. > >> > >> > >> > >> If you are trying to add transactional features, then formally define > >> a DTP capability and pull in other server frameworks to share the > >> implementation. Should it be XA/Open? How about a new peer2peer DTP > >> protocol? > >> > >> > >> > >> Thank you, > >> > >> Robert > >> > >> > >> > >> Robert Withers > >> > >> Staff Analyst/Developer > >> > >> o: (720) 514-8963 > >> > >> c: (571) 262-1873 > >> > >> > >> > >> -----Original Message----- > >> From: Jay Kreps [mailto:jay.kr...@gmail.com] > >> Sent: Sunday, February 16, 2014 10:13 AM > >> To: users@kafka.apache.org<mailto:users@kafka.apache.org><mailto: > >> users@kafka.apache.org><mailto: > >> users@kafka.apache.org<mailto:users@kafka.apache.org>> > >> Subject: Re: New Consumer API discussion > >> > >> > >> > >> +1 I think those are good. It is a little weird that changing the > >> +fetch > >> > >> point is not batched but changing the commit point is, but I suppose > >> there is no helping that. > >> > >> > >> > >> -Jay > >> > >> > >> > >> > >> > >> On Sat, Feb 15, 2014 at 7:52 AM, Neha Narkhede > >> <neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com> <mailto: > >> neha.narkh...@gmail.com> > >> <mailto:neha.narkh...@gmail.com> > >> <mailto:neha.narkh...@gmail.com>>wrote: > >> > >> > >> > >> Jay, > >> > >> > >> > >> That makes sense. position/seek deal with changing the consumers > >> > >> in-memory data, so there is no remote rpc there. For some reason, I > >> > >> got committed and seek mixed up in my head at that time :) > >> > >> > >> > >> So we still end up with > >> > >> > >> > >> long position(TopicPartition tp) > >> > >> void seek(TopicPartitionOffset p) > >> > >> Map<TopicPartition, Long> committed(TopicPartition tp); > >> > >> void commit(TopicPartitionOffset...); > >> > >> > >> > >> Thanks, > >> > >> Neha > >> > >> > >> > >> On Friday, February 14, 2014, Jay Kreps <jay.kr...@gmail.com<mailto: > >> jay.kr...@gmail.com><mailto: > >> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto: > >> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto: > >> jay.kr...@gmail.com>><mailto: > >> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto: > >> jay.kr...@gmail.com><mailto:jay.kreps@gmail > >> .com>>> > >> wrote: > >> > >> > >> > >> Oh, interesting. So I am assuming the following implementation: > >> > >> 1. We have an in-memory fetch position which controls the next fetch > >> > >> offset. > >> > >> 2. Changing this has no effect until you poll again at which point > >> > >> your fetch request will be from the newly specified offset 3. We > >> > >> then have an in-memory but also remotely stored committed offset. > >> > >> 4. Calling commit has the effect of saving the fetch position as > >> > >> both the in memory committed position and in the remote store 5. > >> > >> Auto-commit is the same as periodically calling commit on all > >> > >> positions. > >> > >> > >> > >> So batching on commit as well as getting the committed position > >> > >> makes sense, but batching the fetch position wouldn't, right? I > >> > >> think you are actually thinking of a different approach. > >> > >> > >> > >> -Jay > >> > >> > >> > >> > >> > >> On Thu, Feb 13, 2014 at 10:40 PM, Neha Narkhede > >> > >> <neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto: > >> neha.narkh...@gmail.com><mailto: > >> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>> > >> > >> <javascript:;> > >> > >> wrote: > >> > >> > >> > >> I think you are saying both, i.e. if you have committed on a > >> > >> partition it returns you that value but if you > >> > >> haven't > >> > >> it does a remote lookup? > >> > >> > >> > >> Correct. > >> > >> > >> > >> The other argument for making committed batched is that commit() > >> > >> is batched, so there is symmetry. > >> > >> > >> > >> position() and seek() are always in memory changes (I assume) so > >> > >> there > >> > >> is > >> > >> no need to batch them. > >> > >> > >> > >> I'm not as sure as you are about that assumption being true. > >> > >> Basically > >> > >> in > >> > >> my example above, the batching argument for committed() also > >> > >> applies to > >> > >> position() since one purpose of fetching a partition's offset is > >> > >> to use > >> > >> it > >> > >> to set the position of the consumer to that offset. Since that > >> > >> might > >> > >> lead > >> > >> to a remote OffsetRequest call, I think we probably would be > >> > >> better off batching it. > >> > >> > >> > >> Another option for naming would be position/reposition instead of > >> > >> position/seek. > >> > >> > >> > >> I think position/seek is better since it aligns with Java file APIs. > >> > >> > >> > >> I also think your suggestion about ConsumerPosition makes sense. > >> > >> > >> > >> Thanks, > >> > >> Neha > >> > >> On Feb 13, 2014 9:22 PM, "Jay Kreps" <jay.kr...@gmail.com<mailto: > >> jay.kr...@gmail.com><mailto: > >> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto: > >> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto: > >> jay.kr...@gmail.com>><mailto: > >> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto: > >> jay.kr...@gmail.com><mailto:jay.kreps@gmail > >> .com>>> > >> wrote: > >> > >> > >> > >> Hey Neha, > >> > >> > >> > >> I actually wasn't proposing the name TopicOffsetPosition, that > >> > >> was > >> > >> just a > >> > >> typo. I meant TopicPartitionOffset, and I was just referencing > >> > >> what > >> > >> was > >> > >> in > >> > >> the javadoc. So to restate my proposal without the typo, using > >> > >> just > >> > >> the > >> > >> existing classes (that naming is a separate question): > >> > >> long position(TopicPartition tp) > >> > >> void seek(TopicPartitionOffset p) > >> > >> long committed(TopicPartition tp) > >> > >> void commit(TopicPartitionOffset...); > >> > >> > >> > >> So I may be unclear on committed() (AKA lastCommittedOffset). Is > >> > >> it returning the in-memory value from the last commit by this > >> > >> consumer, > >> > >> or > >> > >> is > >> > >> it doing a remote fetch, or both? I think you are saying both, i.e. > >> > >> if > >> > >> you > >> > >> have committed on a partition it returns you that value but if > >> > >> you > >> > >> haven't > >> > >> it does a remote lookup? > >> > >> > >> > >> The other argument for making committed batched is that commit() > >> > >> is batched, so there is symmetry. > >> > >> > >> > >> position() and seek() are always in memory changes (I assume) so > >> > >> there > >> > >> is > >> > >> no need to batch them. > >> > >> > >> > >> So taking all that into account what if we revise it to > >> > >> long position(TopicPartition tp) > >> > >> void seek(TopicPartitionOffset p) > >> > >> Map<TopicPartition, Long> committed(TopicPartition tp); > >> > >> void commit(TopicPartitionOffset...); > >> > >> > >> > >> This is not symmetric between position/seek and commit/committed > >> > >> but > >> > >> it > >> > >> is > >> > >> convenient. Another option for naming would be > >> > >> position/reposition > >> > >> instead > >> > >> of position/seek. > >> > >> > >> > >> With respect to the name TopicPartitionOffset, what I was trying > >> > >> to > >> > >> say > >> > >> is > >> > >> that I recommend we change that to something shorter. I think > >> > >> TopicPosition > >> > >> or ConsumerPosition might be better. Position does not refer to > >> > >> the variables in the object, it refers to the meaning of the > >> > >> object--it represents a position within a topic. The offset > >> > >> field in that object > >> > >> is > >> > >> still called the offset. TopicOffset, PartitionOffset, or > >> > >> ConsumerOffset > >> > >> would all be workable too. Basically I am just objecting to > >> > >> concatenating > >> > >> three nouns together. :-) > >> > >> > >> > >> -Jay > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> On Thu, Feb 13, 2014 at 1:54 PM, Neha Narkhede < > >> > >> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto: > >> neha.narkh...@gmail.com><mailto: > >> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>><mailto: > >> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto: > >> neha.narkh...@gmail.com>> > >> > >> wrote: > >> > >> > >> > >> 2. It returns a list of results. But how can you use the list? > >> > >> The > >> > >> only > >> > >> way > >> > >> to use the list is to make a map of tp=>offset and then look > >> > >> up > >> > >> results > >> > >> in > >> > >> this map (or do a for loop over the list for the partition you > >> > >> want). I > >> > >> recommend that if this is an in-memory check we just do one at > >> > >> a > >> > >> time. > >> > >> E.g. > >> > >> long committedPosition( > >> > >> TopicPosition). > >> > >> > >> > >> This was discussed in the previous emails. There is a choic > >> > >> > >> > >> > >> -- > >> Robert Withers > >> robert.with...@dish.com<mailto:robert.with...@dish.com><mailto: > >> robert.with...@dish.com><mailto: > >> robert.with...@dish.com<mailto:robert.with...@dish.com>> > >> c: 303.919.5856 > >> > >> > >> > >> -- > >> Robert Withers > >> robert.with...@dish.com<mailto:robert.with...@dish.com><mailto: > >> robert.with...@dish.com> > >> c: 303.919.5856 > >> > >> > >> > >> > >> > >> > >> -- > >> -- Guozhang > >> > >> > >> > >> -- > >> Robert Withers > >> robert.with...@dish.com<mailto:robert.with...@dish.com> > >> c: 303.919.5856 > >> > >> > > >