Robert, Are you saying it is possible to get events from the high-level consumerregarding various state machine changes? For instance, can we get a notification when a rebalance starts and ends, when a partition is assigned/unassigned, when an offset is committed on a partition, when a leader changes and so on? I call this OOB traffic, since they are not the core messages streaming, but side-band events, yet they are still potentially useful to consumers.
In the current proposal, you get notified when the state machine changes i.e. before and after a rebalance is triggered. Look at ConsumerRebalanceCallback<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerRebalanceCallback.html> .Leader changes do not count as state machine changes for consumer rebalance purposes. Thanks, Neha On Tue, Feb 25, 2014 at 9:54 AM, Neha Narkhede <neha.narkh...@gmail.com>wrote: > Jay/Robert - > > > I think what Robert is saying is that we need to think through the offset > API to enable "batch processing" of topic data. Think of a process that > periodically kicks off to compute a data summary or do a data load or > something like that. I think what we need to support this is an api to > fetch the last offset from the server for a partition. Something like > long lastOffset(TopicPartition tp) > and for symmetry > long firstOffset(TopicPartition tp) > > Likely this would have to be batched. > > A fixed range of data load can be done using the existing APIs as follows. > This assumes you know the endOffset which can be currentOffset + n (number > of messages in the load) > > long startOffset = consumer.position(partition); > long endOffset = startOffset + n; > while(consumer.position(partition) <= endOffset) { > List<ConsumerRecord> messages = consumer.poll(timeout, > TimeUnit.MILLISECONDS); > process(messages, endOffset); // processes messages until > endOffset > } > > Does that make sense? > > > On Tue, Feb 25, 2014 at 9:49 AM, Neha Narkhede <neha.narkh...@gmail.com>wrote: > >> Thanks for the review, Jun. Here are some comments - >> >> >> 1. The using of ellipsis: This may make passing a list of items from a >> collection to the api a bit harder. Suppose that you have a list of >> topics >> stored in >> >> ArrayList<String> topics; >> >> If you want subscribe to all topics in one call, you will have to do: >> >> String[] topicArray = new String[topics.size()]; >> consumer.subscribe(topics. >> toArray(topicArray)); >> >> A similar argument can be made for arguably the more common use case of >> subscribing to a single topic as well. In these cases, user is required >> to write more >> code to create a single item collection and pass it in. Since >> subscription is extremely lightweight >> invoking it multiple times also seems like a workable solution, no? >> >> 2. It would be good to document that the following apis are mutually >> exclusive. Also, if the partition level subscription is specified, there >> is >> no group management. Finally, unsubscribe() can only be used to cancel >> subscriptions with the same pattern. For example, you can't unsubscribe at >> the partition level if the subscription is done at the topic level. >> >> *subscribe*(java.lang.String... topics) >> *subscribe*(java.lang.String topic, int... partitions) >> >> Makes sense. Made the suggested improvements to the >> docs<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/Consumer.html#subscribe%28java.lang.String...%29> >> >> >> 3.commit(): The following comment in the doc should probably say "commit >> offsets for partitions assigned to this consumer". >> >> If no partitions are specified, commits offsets for the subscribed list >> of >> topics and partitions to Kafka. >> >> Could you give more context on this suggestion? Here is the entire doc - >> >> Synchronously commits the specified offsets for the specified list of >> topics and partitions to *Kafka*. If no partitions are specified, >> commits offsets for the subscribed list of topics and partitions. >> >> The hope is to convey that if no partitions are specified, offsets will >> be committed for the subscribed list of partitions. One improvement could >> be to >> explicitly state that the offsets returned on the last poll will be >> committed. I updated this to - >> >> Synchronously commits the specified offsets for the specified list of >> topics and partitions to *Kafka*. If no offsets are specified, commits >> offsets returned on the last {@link #poll(long, TimeUnit) poll()} for >> the subscribed list of topics and partitions. >> >> 4. There is inconsistency in specifying partitions. Sometimes we use >> TopicPartition and some other times we use String and int (see >> examples below). >> >> void onPartitionsAssigned(Consumer consumer, TopicPartition...partitions) >> >> public void *subscribe*(java.lang.String topic, int... partitions) >> >> Yes, this was discussed previously. I think generally the consensus seems >> to be to use the higher level >> classes everywhere. Made those changes. >> >> What's the use case of position()? Isn't that just the nextOffset() on the >> last message returned from poll()? >> >> Yes, except in the case where a rebalance is triggered and poll() is not >> yet invoked. Here, you would use position() to get the new fetch position >> for the specific partition. Even if this is not a common use case, IMO it >> is much easier to use position() to get the fetch offset than invoking >> nextOffset() on the last message. This also keeps the APIs symmetric, which >> is nice. >> >> >> >> >> On Mon, Feb 24, 2014 at 7:06 PM, Withers, Robert <robert.with...@dish.com >> > wrote: >> >>> That's wonderful. Thanks for kafka. >>> >>> Rob >>> >>> On Feb 24, 2014, at 9:58 AM, Guozhang Wang <wangg...@gmail.com<mailto: >>> wangg...@gmail.com>> wrote: >>> >>> Hi Robert, >>> >>> Yes, you can check out the callback functions in the new API >>> >>> onPartitionDesigned >>> onPartitionAssigned >>> >>> and see if they meet your needs. >>> >>> Guozhang >>> >>> >>> On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert < >>> robert.with...@dish.com<mailto:robert.with...@dish.com>>wrote: >>> >>> Jun, >>> >>> Are you saying it is possible to get events from the high-level consumer >>> regarding various state machine changes? For instance, can we get a >>> notification when a rebalance starts and ends, when a partition is >>> assigned/unassigned, when an offset is committed on a partition, when a >>> leader changes and so on? I call this OOB traffic, since they are not >>> the >>> core messages streaming, but side-band events, yet they are still >>> potentially useful to consumers. >>> >>> Thank you, >>> Robert >>> >>> >>> Robert Withers >>> Staff Analyst/Developer >>> o: (720) 514-8963 >>> c: (571) 262-1873 >>> >>> >>> >>> -----Original Message----- >>> From: Jun Rao [mailto:jun...@gmail.com] >>> Sent: Sunday, February 23, 2014 4:19 PM >>> To: users@kafka.apache.org<mailto:users@kafka.apache.org> >>> Subject: Re: New Consumer API discussion >>> >>> Robert, >>> >>> For the push orient api, you can potentially implement your own >>> MessageHandler with those methods. In the main loop of our new consumer >>> api, you can just call those methods based on the events you get. >>> >>> Also, we already have an api to get the first and the last offset of a >>> partition (getOffsetBefore). >>> >>> Thanks, >>> >>> Jun >>> >>> >>> On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert >>> <robert.with...@dish.com<mailto:robert.with...@dish.com>>wrote: >>> >>> This is a good idea, too. I would modify it to include stream >>> marking, then you can have: >>> >>> long end = consumer.lastOffset(tp); >>> consumer.setMark(end); >>> while(consumer.beforeMark()) { >>> process(consumer.pollToMark()); >>> } >>> >>> or >>> >>> long end = consumer.lastOffset(tp); >>> consumer.setMark(end); >>> for(Object msg : consumer.iteratorToMark()) { >>> process(msg); >>> } >>> >>> I actually have 4 suggestions, then: >>> >>> * pull: stream marking >>> * pull: finite streams, bound by time range (up-to-now, yesterday) or >>> offset >>> * pull: async api >>> * push: KafkaMessageSource, for a push model, with msg and OOB events. >>> Build one in either individual or chunk mode and have a listener for >>> each msg or a listener for a chunk of msgs. Make it composable and >>> policy driven (chunked, range, commitOffsets policy, retry policy, >>> transactional) >>> >>> Thank you, >>> Robert >>> >>> On Feb 22, 2014, at 11:21 AM, Jay Kreps <jay.kr...@gmail.com<mailto: >>> jay.kr...@gmail.com><mailto: >>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>>> wrote: >>> >>> I think what Robert is saying is that we need to think through the >>> offset API to enable "batch processing" of topic data. Think of a >>> process that periodically kicks off to compute a data summary or do a >>> data load or something like that. I think what we need to support this >>> is an api to fetch the last offset from the server for a partition. >>> Something like >>> long lastOffset(TopicPartition tp) >>> and for symmetry >>> long firstOffset(TopicPartition tp) >>> >>> Likely this would have to be batched. Essentially we should add this >>> use case to our set of code examples to write and think through. >>> >>> The usage would be something like >>> >>> long end = consumer.lastOffset(tp); >>> while(consumer.position < end) >>> process(consumer.poll()); >>> >>> -Jay >>> >>> >>> On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert >>> <robert.with...@dish.com<mailto:robert.with...@dish.com> >>> <mailto:robert.with...@dish.com>>wrote: >>> >>> Jun, >>> >>> I was originally thinking a non-blocking read from a distributed >>> stream should distinguish between "no local messages, but a fetch is >>> occurring" >>> versus "you have drained the stream". The reason this may be valuable >>> to me is so I can write consumers that read all known traffic then >>> terminate. >>> You caused me to reconsider and I think I am conflating 2 things. One >>> is a sync/async api while the other is whether to have an infinite or >>> finite stream. Is it possible to build a finite KafkaStream on a >>> range of messages? >>> >>> Perhaps a Simple Consumer would do just fine and then I could start >>> off getting the writeOffset from zookeeper and tell it to read a >>> specified range per partition. I've done this and forked a simple >>> consumer runnable for each partition, for one of our analyzers. The >>> great thing about the high-level consumer is that rebalance, so I can >>> fork however many stream readers I want and you just figure it out for >>> me. In that way you offer us the control over the resource >>> consumption within a pull model. This is best to regulate message >>> pressure, they say. >>> >>> Combining that high-level rebalance ability with a ranged partition >>> drain could be really nice...build the stream with an ending position >>> and it is a finite stream, but retain the high-level rebalance. With >>> a finite stream, you would know the difference of the 2 async >>> scenarios: fetch-in-progress versus end-of-stream. With an infinite >>> stream, you never get end-of-stream. >>> >>> Aside from a high-level consumer over a finite range within each >>> partition, the other feature I can think of is more complicated. A >>> high-level consumer has state machine changes that the client cannot >>> access, to my knowledge. Our use of kafka has us invoke a message >>> handler with each message we consumer from the KafkaStream, so we >>> convert a pull-model to a push-model. Including the idea of receiving >>> notifications from state machine changes, what would be really nice is >>> to have a KafkaMessageSource, that is an eventful push model. If it >>> were thread-safe, then we could register listeners for various events: >>> >>> * opening-stream >>> * closing-stream >>> * message-arrived >>> * end-of-stream/no-more-messages-in-partition (for finite streams) >>> * rebalance started >>> * partition assigned >>> * partition unassigned >>> * rebalance finished >>> * partition-offset-committed >>> >>> Perhaps that is just our use, but instead of a pull-oriented >>> KafkaStream, is there any sense in your providing a push-oriented >>> KafkaMessageSource publishing OOB messages? >>> >>> thank you, >>> Robert >>> >>> On Feb 21, 2014, at 5:59 PM, Jun Rao <jun...@gmail.com<mailto: >>> jun...@gmail.com><mailto: >>> jun...@gmail.com<mailto:jun...@gmail.com>><mailto: >>> jun...@gmail.com<mailto:jun...@gmail.com><mailto:jun...@gmail.com>>> >>> wrote: >>> >>> Robert, >>> >>> Could you explain why you want to distinguish btw >>> FetchingInProgressException and NoMessagePendingException? The >>> nextMsgs() method that you want is exactly what poll() does. >>> >>> Thanks, >>> >>> Jun >>> >>> >>> On Wed, Feb 19, 2014 at 8:45 AM, Withers, Robert >>> <robert.with...@dish.com<mailto:robert.with...@dish.com> <mailto: >>> robert.with...@dish.com> >>> <mailto:robert.with...@dish.com>>wrote: >>> >>> I am not clear on why the consumer stream should be positionable, >>> especially if it is limited to the in-memory fetched messages. Could >>> someone explain to me, please? I really like the idea of committing >>> the offset specifically on those partitions with changed read offsets, >>> only. >>> >>> >>> >>> 2 items I would like to see added to the KafkaStream are: >>> >>> * a non-blocking next(), throws several exceptions >>> (FetchingInProgressException and a NoMessagePendingException or >>> something) to differentiate between fetching or no messages left. >>> >>> * A nextMsgs() method which returns all locally available >>> messages >>> and kicks off a fetch for the next chunk. >>> >>> >>> >>> If you are trying to add transactional features, then formally define >>> a DTP capability and pull in other server frameworks to share the >>> implementation. Should it be XA/Open? How about a new peer2peer DTP >>> protocol? >>> >>> >>> >>> Thank you, >>> >>> Robert >>> >>> >>> >>> Robert Withers >>> >>> Staff Analyst/Developer >>> >>> o: (720) 514-8963 >>> >>> c: (571) 262-1873 >>> >>> >>> >>> -----Original Message----- >>> From: Jay Kreps [mailto:jay.kr...@gmail.com] >>> Sent: Sunday, February 16, 2014 10:13 AM >>> To: users@kafka.apache.org<mailto:users@kafka.apache.org><mailto: >>> users@kafka.apache.org><mailto: >>> users@kafka.apache.org<mailto:users@kafka.apache.org>> >>> Subject: Re: New Consumer API discussion >>> >>> >>> >>> +1 I think those are good. It is a little weird that changing the >>> +fetch >>> >>> point is not batched but changing the commit point is, but I suppose >>> there is no helping that. >>> >>> >>> >>> -Jay >>> >>> >>> >>> >>> >>> On Sat, Feb 15, 2014 at 7:52 AM, Neha Narkhede >>> <neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com> <mailto: >>> neha.narkh...@gmail.com> >>> <mailto:neha.narkh...@gmail.com> >>> <mailto:neha.narkh...@gmail.com>>wrote: >>> >>> >>> >>> Jay, >>> >>> >>> >>> That makes sense. position/seek deal with changing the consumers >>> >>> in-memory data, so there is no remote rpc there. For some reason, I >>> >>> got committed and seek mixed up in my head at that time :) >>> >>> >>> >>> So we still end up with >>> >>> >>> >>> long position(TopicPartition tp) >>> >>> void seek(TopicPartitionOffset p) >>> >>> Map<TopicPartition, Long> committed(TopicPartition tp); >>> >>> void commit(TopicPartitionOffset...); >>> >>> >>> >>> Thanks, >>> >>> Neha >>> >>> >>> >>> On Friday, February 14, 2014, Jay Kreps <jay.kr...@gmail.com<mailto: >>> jay.kr...@gmail.com><mailto: >>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto: >>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto: >>> jay.kr...@gmail.com>><mailto: >>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto: >>> jay.kr...@gmail.com><mailto:jay.kreps@gmail >>> .com>>> >>> wrote: >>> >>> >>> >>> Oh, interesting. So I am assuming the following implementation: >>> >>> 1. We have an in-memory fetch position which controls the next fetch >>> >>> offset. >>> >>> 2. Changing this has no effect until you poll again at which point >>> >>> your fetch request will be from the newly specified offset 3. We >>> >>> then have an in-memory but also remotely stored committed offset. >>> >>> 4. Calling commit has the effect of saving the fetch position as >>> >>> both the in memory committed position and in the remote store 5. >>> >>> Auto-commit is the same as periodically calling commit on all >>> >>> positions. >>> >>> >>> >>> So batching on commit as well as getting the committed position >>> >>> makes sense, but batching the fetch position wouldn't, right? I >>> >>> think you are actually thinking of a different approach. >>> >>> >>> >>> -Jay >>> >>> >>> >>> >>> >>> On Thu, Feb 13, 2014 at 10:40 PM, Neha Narkhede >>> >>> <neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto: >>> neha.narkh...@gmail.com><mailto: >>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>> >>> >>> <javascript:;> >>> >>> wrote: >>> >>> >>> >>> I think you are saying both, i.e. if you have committed on a >>> >>> partition it returns you that value but if you >>> >>> haven't >>> >>> it does a remote lookup? >>> >>> >>> >>> Correct. >>> >>> >>> >>> The other argument for making committed batched is that commit() >>> >>> is batched, so there is symmetry. >>> >>> >>> >>> position() and seek() are always in memory changes (I assume) so >>> >>> there >>> >>> is >>> >>> no need to batch them. >>> >>> >>> >>> I'm not as sure as you are about that assumption being true. >>> >>> Basically >>> >>> in >>> >>> my example above, the batching argument for committed() also >>> >>> applies to >>> >>> position() since one purpose of fetching a partition's offset is >>> >>> to use >>> >>> it >>> >>> to set the position of the consumer to that offset. Since that >>> >>> might >>> >>> lead >>> >>> to a remote OffsetRequest call, I think we probably would be >>> >>> better off batching it. >>> >>> >>> >>> Another option for naming would be position/reposition instead of >>> >>> position/seek. >>> >>> >>> >>> I think position/seek is better since it aligns with Java file APIs. >>> >>> >>> >>> I also think your suggestion about ConsumerPosition makes sense. >>> >>> >>> >>> Thanks, >>> >>> Neha >>> >>> On Feb 13, 2014 9:22 PM, "Jay Kreps" <jay.kr...@gmail.com<mailto: >>> jay.kr...@gmail.com><mailto: >>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto: >>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto: >>> jay.kr...@gmail.com>><mailto: >>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto: >>> jay.kr...@gmail.com><mailto:jay.kreps@gmail >>> .com>>> >>> wrote: >>> >>> >>> >>> Hey Neha, >>> >>> >>> >>> I actually wasn't proposing the name TopicOffsetPosition, that >>> >>> was >>> >>> just a >>> >>> typo. I meant TopicPartitionOffset, and I was just referencing >>> >>> what >>> >>> was >>> >>> in >>> >>> the javadoc. So to restate my proposal without the typo, using >>> >>> just >>> >>> the >>> >>> existing classes (that naming is a separate question): >>> >>> long position(TopicPartition tp) >>> >>> void seek(TopicPartitionOffset p) >>> >>> long committed(TopicPartition tp) >>> >>> void commit(TopicPartitionOffset...); >>> >>> >>> >>> So I may be unclear on committed() (AKA lastCommittedOffset). Is >>> >>> it returning the in-memory value from the last commit by this >>> >>> consumer, >>> >>> or >>> >>> is >>> >>> it doing a remote fetch, or both? I think you are saying both, i.e. >>> >>> if >>> >>> you >>> >>> have committed on a partition it returns you that value but if >>> >>> you >>> >>> haven't >>> >>> it does a remote lookup? >>> >>> >>> >>> The other argument for making committed batched is that commit() >>> >>> is batched, so there is symmetry. >>> >>> >>> >>> position() and seek() are always in memory changes (I assume) so >>> >>> there >>> >>> is >>> >>> no need to batch them. >>> >>> >>> >>> So taking all that into account what if we revise it to >>> >>> long position(TopicPartition tp) >>> >>> void seek(TopicPartitionOffset p) >>> >>> Map<TopicPartition, Long> committed(TopicPartition tp); >>> >>> void commit(TopicPartitionOffset...); >>> >>> >>> >>> This is not symmetric between position/seek and commit/committed >>> >>> but >>> >>> it >>> >>> is >>> >>> convenient. Another option for naming would be >>> >>> position/reposition >>> >>> instead >>> >>> of position/seek. >>> >>> >>> >>> With respect to the name TopicPartitionOffset, what I was trying >>> >>> to >>> >>> say >>> >>> is >>> >>> that I recommend we change that to something shorter. I think >>> >>> TopicPosition >>> >>> or ConsumerPosition might be better. Position does not refer to >>> >>> the variables in the object, it refers to the meaning of the >>> >>> object--it represents a position within a topic. The offset >>> >>> field in that object >>> >>> is >>> >>> still called the offset. TopicOffset, PartitionOffset, or >>> >>> ConsumerOffset >>> >>> would all be workable too. Basically I am just objecting to >>> >>> concatenating >>> >>> three nouns together. :-) >>> >>> >>> >>> -Jay >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> On Thu, Feb 13, 2014 at 1:54 PM, Neha Narkhede < >>> >>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto: >>> neha.narkh...@gmail.com><mailto: >>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>><mailto: >>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto: >>> neha.narkh...@gmail.com>> >>> >>> wrote: >>> >>> >>> >>> 2. It returns a list of results. But how can you use the list? >>> >>> The >>> >>> only >>> >>> way >>> >>> to use the list is to make a map of tp=>offset and then look >>> >>> up >>> >>> results >>> >>> in >>> >>> this map (or do a for loop over the list for the partition you >>> >>> want). I >>> >>> recommend that if this is an in-memory check we just do one at >>> >>> a >>> >>> time. >>> >>> E.g. >>> >>> long committedPosition( >>> >>> TopicPosition). >>> >>> >>> >>> This was discussed in the previous emails. There is a choic >>> >>> >>> >>> >>> -- >>> Robert Withers >>> robert.with...@dish.com<mailto:robert.with...@dish.com><mailto: >>> robert.with...@dish.com><mailto: >>> robert.with...@dish.com<mailto:robert.with...@dish.com>> >>> c: 303.919.5856 >>> >>> >>> >>> -- >>> Robert Withers >>> robert.with...@dish.com<mailto:robert.with...@dish.com><mailto: >>> robert.with...@dish.com> >>> c: 303.919.5856 >>> >>> >>> >>> >>> >>> >>> -- >>> -- Guozhang >>> >>> >>> >>> -- >>> Robert Withers >>> robert.with...@dish.com<mailto:robert.with...@dish.com> >>> c: 303.919.5856 >>> >>> >> >