Rob, The use of the callbacks is explained in the javadoc here - http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerRebalanceCallback.html
Let me know if it makes sense. The hope is to improve the javadoc so that it is self explanatory. Thanks, Neha On Wed, Feb 26, 2014 at 9:16 AM, Robert Withers <robert.w.with...@gmail.com>wrote: > Neha, what does the use of the RebalanceBeginCallback and > RebalanceEndCallback look like? > > thanks, > Rob > > On Feb 25, 2014, at 3:51 PM, Neha Narkhede <neha.narkh...@gmail.com> > wrote: > > > How do you know n? The whole point is that you need to be able to fetch > the > > end offset. You can't a priori decide you will load 1m messages without > > knowing what is there. > > > > Hmm. I think what you are pointing out is that in the new consumer API, > we > > don't have a way to issue the equivalent of the existing > getOffsetsBefore() > > API. Agree that is a flaw that we should fix. > > > > Will update the docs/wiki with a few use cases that I've collected so far > > and see if the API covers those. > > > > I would prefer PartitionsAssigned and PartitionsRevoked as that seems > > clearer to me > > > > Well the RebalanceBeginCallback interface will have > onPartitionsAssigned() > > as the callback. Similarly, the RebalanceEndCallback interface will have > > onPartitionsRevoked() as the callback. Makes sense? > > > > Thanks, > > Neha > > > > > > On Tue, Feb 25, 2014 at 2:38 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > > > >> 1. I would prefer PartitionsAssigned and PartitionsRevoked as that seems > >> clearer to me. > >> > >> -Jay > >> > >> > >> On Tue, Feb 25, 2014 at 10:19 AM, Neha Narkhede < > neha.narkh...@gmail.com > >>> wrote: > >> > >>> Thanks for the reviews so far! There are a few outstanding questions - > >>> > >>> 1. It will be good to make the rebalance callbacks forward compatible > >> with > >>> Java 8 capabilities. We can change it to PartitionsAssignedCallback > >>> and PartitionsRevokedCallback or RebalanceBeginCallback and > >>> RebalanceEndCallback? > >>> > >>> If there are no objections, I will change it to RebalanceBeginCallback > >> and > >>> RebalanceEndCallback. > >>> > >>> 2. The return type for committed() is List<TopicPartitionOffset>. > There > >>> was a suggestion to change it to either be Map<TopicPartition,Long> or > >>> Map<TopicPartition, TopicPartitionOffset> > >>> > >>> Do people have feedback on this suggestion? > >>> > >>> > >>> On Tue, Feb 25, 2014 at 9:56 AM, Neha Narkhede < > neha.narkh...@gmail.com > >>>> wrote: > >>> > >>>> Robert, > >>>> > >>>> Are you saying it is possible to get events from the high-level > >>> consumerregarding various state machine changes? For instance, can we > >> get a > >>>> notification when a rebalance starts and ends, when a partition is > >>>> assigned/unassigned, when an offset is committed on a partition, when > a > >>>> leader changes and so on? I call this OOB traffic, since they are not > >>> the > >>>> core messages streaming, but side-band events, yet they are still > >>>> potentially useful to consumers. > >>>> > >>>> In the current proposal, you get notified when the state machine > >> changes > >>>> i.e. before and after a rebalance is triggered. Look at > >>>> ConsumerRebalanceCallback< > >>> > >> > http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerRebalanceCallback.html > >>>> > >>>> .Leader changes do not count as state machine changes for consumer > >>>> rebalance purposes. > >>>> > >>>> Thanks, > >>>> Neha > >>>> > >>>> > >>>> On Tue, Feb 25, 2014 at 9:54 AM, Neha Narkhede < > >> neha.narkh...@gmail.com > >>>> wrote: > >>>> > >>>>> Jay/Robert - > >>>>> > >>>>> > >>>>> I think what Robert is saying is that we need to think through the > >>> offset > >>>>> API to enable "batch processing" of topic data. Think of a process > >> that > >>>>> periodically kicks off to compute a data summary or do a data load or > >>>>> something like that. I think what we need to support this is an api > to > >>>>> fetch the last offset from the server for a partition. Something like > >>>>> long lastOffset(TopicPartition tp) > >>>>> and for symmetry > >>>>> long firstOffset(TopicPartition tp) > >>>>> > >>>>> Likely this would have to be batched. > >>>>> > >>>>> A fixed range of data load can be done using the existing APIs as > >>>>> follows. This assumes you know the endOffset which can be > >> currentOffset > >>> + n > >>>>> (number of messages in the load) > >>>>> > >>>>> long startOffset = consumer.position(partition); > >>>>> long endOffset = startOffset + n; > >>>>> while(consumer.position(partition) <= endOffset) { > >>>>> List<ConsumerRecord> messages = consumer.poll(timeout, > >>>>> TimeUnit.MILLISECONDS); > >>>>> process(messages, endOffset); // processes messages > >> until > >>>>> endOffset > >>>>> } > >>>>> > >>>>> Does that make sense? > >>>>> > >>>>> > >>>>> On Tue, Feb 25, 2014 at 9:49 AM, Neha Narkhede < > >> neha.narkh...@gmail.com > >>>> wrote: > >>>>> > >>>>>> Thanks for the review, Jun. Here are some comments - > >>>>>> > >>>>>> > >>>>>> 1. The using of ellipsis: This may make passing a list of items from > >> a > >>>>>> collection to the api a bit harder. Suppose that you have a list of > >>>>>> topics > >>>>>> stored in > >>>>>> > >>>>>> ArrayList<String> topics; > >>>>>> > >>>>>> If you want subscribe to all topics in one call, you will have to > do: > >>>>>> > >>>>>> String[] topicArray = new String[topics.size()]; > >>>>>> consumer.subscribe(topics. > >>>>>> toArray(topicArray)); > >>>>>> > >>>>>> A similar argument can be made for arguably the more common use case > >> of > >>>>>> subscribing to a single topic as well. In these cases, user is > >> required > >>>>>> to write more > >>>>>> code to create a single item collection and pass it in. Since > >>>>>> subscription is extremely lightweight > >>>>>> invoking it multiple times also seems like a workable solution, no? > >>>>>> > >>>>>> 2. It would be good to document that the following apis are mutually > >>>>>> exclusive. Also, if the partition level subscription is specified, > >>> there > >>>>>> is > >>>>>> no group management. Finally, unsubscribe() can only be used to > >> cancel > >>>>>> subscriptions with the same pattern. For example, you can't > >> unsubscribe > >>>>>> at > >>>>>> the partition level if the subscription is done at the topic level. > >>>>>> > >>>>>> *subscribe*(java.lang.String... topics) > >>>>>> *subscribe*(java.lang.String topic, int... partitions) > >>>>>> > >>>>>> Makes sense. Made the suggested improvements to the docs< > >>> > >> > http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/Consumer.html#subscribe%28java.lang.String...%29 > >>>> > >>>>>> > >>>>>> > >>>>>> 3.commit(): The following comment in the doc should probably say > >>> "commit > >>>>>> offsets for partitions assigned to this consumer". > >>>>>> > >>>>>> If no partitions are specified, commits offsets for the subscribed > >>> list > >>>>>> of > >>>>>> topics and partitions to Kafka. > >>>>>> > >>>>>> Could you give more context on this suggestion? Here is the entire > >> doc > >>> - > >>>>>> > >>>>>> Synchronously commits the specified offsets for the specified list > of > >>>>>> topics and partitions to *Kafka*. If no partitions are specified, > >>>>>> commits offsets for the subscribed list of topics and partitions. > >>>>>> > >>>>>> The hope is to convey that if no partitions are specified, offsets > >> will > >>>>>> be committed for the subscribed list of partitions. One improvement > >>> could > >>>>>> be to > >>>>>> explicitly state that the offsets returned on the last poll will be > >>>>>> committed. I updated this to - > >>>>>> > >>>>>> Synchronously commits the specified offsets for the specified list > of > >>>>>> topics and partitions to *Kafka*. If no offsets are specified, > >> commits > >>>>>> offsets returned on the last {@link #poll(long, TimeUnit) poll()} > for > >>>>>> the subscribed list of topics and partitions. > >>>>>> > >>>>>> 4. There is inconsistency in specifying partitions. Sometimes we use > >>>>>> TopicPartition and some other times we use String and int (see > >>>>>> examples below). > >>>>>> > >>>>>> void onPartitionsAssigned(Consumer consumer, > >>>>>> TopicPartition...partitions) > >>>>>> > >>>>>> public void *subscribe*(java.lang.String topic, int... partitions) > >>>>>> > >>>>>> Yes, this was discussed previously. I think generally the consensus > >>>>>> seems to be to use the higher level > >>>>>> classes everywhere. Made those changes. > >>>>>> > >>>>>> What's the use case of position()? Isn't that just the nextOffset() > >> on > >>>>>> the > >>>>>> last message returned from poll()? > >>>>>> > >>>>>> Yes, except in the case where a rebalance is triggered and poll() is > >>> not > >>>>>> yet invoked. Here, you would use position() to get the new fetch > >>> position > >>>>>> for the specific partition. Even if this is not a common use case, > >> IMO > >>> it > >>>>>> is much easier to use position() to get the fetch offset than > >> invoking > >>>>>> nextOffset() on the last message. This also keeps the APIs > symmetric, > >>> which > >>>>>> is nice. > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> On Mon, Feb 24, 2014 at 7:06 PM, Withers, Robert < > >>>>>> robert.with...@dish.com> wrote: > >>>>>> > >>>>>>> That's wonderful. Thanks for kafka. > >>>>>>> > >>>>>>> Rob > >>>>>>> > >>>>>>> On Feb 24, 2014, at 9:58 AM, Guozhang Wang <wangg...@gmail.com > >>> <mailto: > >>>>>>> wangg...@gmail.com>> wrote: > >>>>>>> > >>>>>>> Hi Robert, > >>>>>>> > >>>>>>> Yes, you can check out the callback functions in the new API > >>>>>>> > >>>>>>> onPartitionDesigned > >>>>>>> onPartitionAssigned > >>>>>>> > >>>>>>> and see if they meet your needs. > >>>>>>> > >>>>>>> Guozhang > >>>>>>> > >>>>>>> > >>>>>>> On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert < > >>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com>>wrote: > >>>>>>> > >>>>>>> Jun, > >>>>>>> > >>>>>>> Are you saying it is possible to get events from the high-level > >>> consumer > >>>>>>> regarding various state machine changes? For instance, can we get > a > >>>>>>> notification when a rebalance starts and ends, when a partition is > >>>>>>> assigned/unassigned, when an offset is committed on a partition, > >> when > >>> a > >>>>>>> leader changes and so on? I call this OOB traffic, since they are > >> not > >>>>>>> the > >>>>>>> core messages streaming, but side-band events, yet they are still > >>>>>>> potentially useful to consumers. > >>>>>>> > >>>>>>> Thank you, > >>>>>>> Robert > >>>>>>> > >>>>>>> > >>>>>>> Robert Withers > >>>>>>> Staff Analyst/Developer > >>>>>>> o: (720) 514-8963 > >>>>>>> c: (571) 262-1873 > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> -----Original Message----- > >>>>>>> From: Jun Rao [mailto:jun...@gmail.com] > >>>>>>> Sent: Sunday, February 23, 2014 4:19 PM > >>>>>>> To: users@kafka.apache.org<mailto:users@kafka.apache.org> > >>>>>>> Subject: Re: New Consumer API discussion > >>>>>>> > >>>>>>> Robert, > >>>>>>> > >>>>>>> For the push orient api, you can potentially implement your own > >>>>>>> MessageHandler with those methods. In the main loop of our new > >>> consumer > >>>>>>> api, you can just call those methods based on the events you get. > >>>>>>> > >>>>>>> Also, we already have an api to get the first and the last offset > >> of a > >>>>>>> partition (getOffsetBefore). > >>>>>>> > >>>>>>> Thanks, > >>>>>>> > >>>>>>> Jun > >>>>>>> > >>>>>>> > >>>>>>> On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert > >>>>>>> <robert.with...@dish.com<mailto:robert.with...@dish.com>>wrote: > >>>>>>> > >>>>>>> This is a good idea, too. I would modify it to include stream > >>>>>>> marking, then you can have: > >>>>>>> > >>>>>>> long end = consumer.lastOffset(tp); > >>>>>>> consumer.setMark(end); > >>>>>>> while(consumer.beforeMark()) { > >>>>>>> process(consumer.pollToMark()); > >>>>>>> } > >>>>>>> > >>>>>>> or > >>>>>>> > >>>>>>> long end = consumer.lastOffset(tp); > >>>>>>> consumer.setMark(end); > >>>>>>> for(Object msg : consumer.iteratorToMark()) { > >>>>>>> process(msg); > >>>>>>> } > >>>>>>> > >>>>>>> I actually have 4 suggestions, then: > >>>>>>> > >>>>>>> * pull: stream marking > >>>>>>> * pull: finite streams, bound by time range (up-to-now, > yesterday) > >>> or > >>>>>>> offset > >>>>>>> * pull: async api > >>>>>>> * push: KafkaMessageSource, for a push model, with msg and OOB > >>> events. > >>>>>>> Build one in either individual or chunk mode and have a listener > for > >>>>>>> each msg or a listener for a chunk of msgs. Make it composable and > >>>>>>> policy driven (chunked, range, commitOffsets policy, retry policy, > >>>>>>> transactional) > >>>>>>> > >>>>>>> Thank you, > >>>>>>> Robert > >>>>>>> > >>>>>>> On Feb 22, 2014, at 11:21 AM, Jay Kreps <jay.kr...@gmail.com > >> <mailto: > >>>>>>> jay.kr...@gmail.com><mailto: > >>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>>> wrote: > >>>>>>> > >>>>>>> I think what Robert is saying is that we need to think through the > >>>>>>> offset API to enable "batch processing" of topic data. Think of a > >>>>>>> process that periodically kicks off to compute a data summary or do > >> a > >>>>>>> data load or something like that. I think what we need to support > >> this > >>>>>>> is an api to fetch the last offset from the server for a partition. > >>>>>>> Something like > >>>>>>> long lastOffset(TopicPartition tp) > >>>>>>> and for symmetry > >>>>>>> long firstOffset(TopicPartition tp) > >>>>>>> > >>>>>>> Likely this would have to be batched. Essentially we should add > this > >>>>>>> use case to our set of code examples to write and think through. > >>>>>>> > >>>>>>> The usage would be something like > >>>>>>> > >>>>>>> long end = consumer.lastOffset(tp); > >>>>>>> while(consumer.position < end) > >>>>>>> process(consumer.poll()); > >>>>>>> > >>>>>>> -Jay > >>>>>>> > >>>>>>> > >>>>>>> On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert > >>>>>>> <robert.with...@dish.com<mailto:robert.with...@dish.com> > >>>>>>> <mailto:robert.with...@dish.com>>wrote: > >>>>>>> > >>>>>>> Jun, > >>>>>>> > >>>>>>> I was originally thinking a non-blocking read from a distributed > >>>>>>> stream should distinguish between "no local messages, but a fetch > is > >>>>>>> occurring" > >>>>>>> versus "you have drained the stream". The reason this may be > >> valuable > >>>>>>> to me is so I can write consumers that read all known traffic then > >>>>>>> terminate. > >>>>>>> You caused me to reconsider and I think I am conflating 2 things. > >> One > >>>>>>> is a sync/async api while the other is whether to have an infinite > >> or > >>>>>>> finite stream. Is it possible to build a finite KafkaStream on a > >>>>>>> range of messages? > >>>>>>> > >>>>>>> Perhaps a Simple Consumer would do just fine and then I could start > >>>>>>> off getting the writeOffset from zookeeper and tell it to read a > >>>>>>> specified range per partition. I've done this and forked a simple > >>>>>>> consumer runnable for each partition, for one of our analyzers. > The > >>>>>>> great thing about the high-level consumer is that rebalance, so I > >> can > >>>>>>> fork however many stream readers I want and you just figure it out > >> for > >>>>>>> me. In that way you offer us the control over the resource > >>>>>>> consumption within a pull model. This is best to regulate message > >>>>>>> pressure, they say. > >>>>>>> > >>>>>>> Combining that high-level rebalance ability with a ranged partition > >>>>>>> drain could be really nice...build the stream with an ending > >> position > >>>>>>> and it is a finite stream, but retain the high-level rebalance. > >> With > >>>>>>> a finite stream, you would know the difference of the 2 async > >>>>>>> scenarios: fetch-in-progress versus end-of-stream. With an > infinite > >>>>>>> stream, you never get end-of-stream. > >>>>>>> > >>>>>>> Aside from a high-level consumer over a finite range within each > >>>>>>> partition, the other feature I can think of is more complicated. A > >>>>>>> high-level consumer has state machine changes that the client > cannot > >>>>>>> access, to my knowledge. Our use of kafka has us invoke a message > >>>>>>> handler with each message we consumer from the KafkaStream, so we > >>>>>>> convert a pull-model to a push-model. Including the idea of > >> receiving > >>>>>>> notifications from state machine changes, what would be really nice > >> is > >>>>>>> to have a KafkaMessageSource, that is an eventful push model. If > it > >>>>>>> were thread-safe, then we could register listeners for various > >> events: > >>>>>>> > >>>>>>> * opening-stream > >>>>>>> * closing-stream > >>>>>>> * message-arrived > >>>>>>> * end-of-stream/no-more-messages-in-partition (for finite > streams) > >>>>>>> * rebalance started > >>>>>>> * partition assigned > >>>>>>> * partition unassigned > >>>>>>> * rebalance finished > >>>>>>> * partition-offset-committed > >>>>>>> > >>>>>>> Perhaps that is just our use, but instead of a pull-oriented > >>>>>>> KafkaStream, is there any sense in your providing a push-oriented > >>>>>>> KafkaMessageSource publishing OOB messages? > >>>>>>> > >>>>>>> thank you, > >>>>>>> Robert > >>>>>>> > >>>>>>> On Feb 21, 2014, at 5:59 PM, Jun Rao <jun...@gmail.com<mailto: > >>>>>>> jun...@gmail.com><mailto: > >>>>>>> jun...@gmail.com<mailto:jun...@gmail.com>><mailto: > >>>>>>> jun...@gmail.com<mailto:jun...@gmail.com><mailto:jun...@gmail.com > >>>>> > >>>>>>> wrote: > >>>>>>> > >>>>>>> Robert, > >>>>>>> > >>>>>>> Could you explain why you want to distinguish btw > >>>>>>> FetchingInProgressException and NoMessagePendingException? The > >>>>>>> nextMsgs() method that you want is exactly what poll() does. > >>>>>>> > >>>>>>> Thanks, > >>>>>>> > >>>>>>> Jun > >>>>>>> > >>>>>>> > >>>>>>> On Wed, Feb 19, 2014 at 8:45 AM, Withers, Robert > >>>>>>> <robert.with...@dish.com<mailto:robert.with...@dish.com> <mailto: > >>>>>>> robert.with...@dish.com> > >>>>>>> <mailto:robert.with...@dish.com>>wrote: > >>>>>>> > >>>>>>> I am not clear on why the consumer stream should be positionable, > >>>>>>> especially if it is limited to the in-memory fetched messages. > >> Could > >>>>>>> someone explain to me, please? I really like the idea of > committing > >>>>>>> the offset specifically on those partitions with changed read > >> offsets, > >>>>>>> only. > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> 2 items I would like to see added to the KafkaStream are: > >>>>>>> > >>>>>>> * a non-blocking next(), throws several exceptions > >>>>>>> (FetchingInProgressException and a NoMessagePendingException or > >>>>>>> something) to differentiate between fetching or no messages left. > >>>>>>> > >>>>>>> * A nextMsgs() method which returns all locally available > >>>>>>> messages > >>>>>>> and kicks off a fetch for the next chunk. > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> If you are trying to add transactional features, then formally > >> define > >>>>>>> a DTP capability and pull in other server frameworks to share the > >>>>>>> implementation. Should it be XA/Open? How about a new peer2peer > >> DTP > >>>>>>> protocol? > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> Thank you, > >>>>>>> > >>>>>>> Robert > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> Robert Withers > >>>>>>> > >>>>>>> Staff Analyst/Developer > >>>>>>> > >>>>>>> o: (720) 514-8963 > >>>>>>> > >>>>>>> c: (571) 262-1873 > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> -----Original Message----- > >>>>>>> From: Jay Kreps [mailto:jay.kr...@gmail.com] > >>>>>>> Sent: Sunday, February 16, 2014 10:13 AM > >>>>>>> To: users@kafka.apache.org<mailto:users@kafka.apache.org><mailto: > >>>>>>> users@kafka.apache.org><mailto: > >>>>>>> users@kafka.apache.org<mailto:users@kafka.apache.org>> > >>>>>>> Subject: Re: New Consumer API discussion > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> +1 I think those are good. It is a little weird that changing the > >>>>>>> +fetch > >>>>>>> > >>>>>>> point is not batched but changing the commit point is, but I > suppose > >>>>>>> there is no helping that. > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> -Jay > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> On Sat, Feb 15, 2014 at 7:52 AM, Neha Narkhede > >>>>>>> <neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com> <mailto: > >>>>>>> neha.narkh...@gmail.com> > >>>>>>> <mailto:neha.narkh...@gmail.com> > >>>>>>> <mailto:neha.narkh...@gmail.com>>wrote: > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> Jay, > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> That makes sense. position/seek deal with changing the consumers > >>>>>>> > >>>>>>> in-memory data, so there is no remote rpc there. For some reason, I > >>>>>>> > >>>>>>> got committed and seek mixed up in my head at that time :) > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> So we still end up with > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> long position(TopicPartition tp) > >>>>>>> > >>>>>>> void seek(TopicPartitionOffset p) > >>>>>>> > >>>>>>> Map<TopicPartition, Long> committed(TopicPartition tp); > >>>>>>> > >>>>>>> void commit(TopicPartitionOffset...); > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> Thanks, > >>>>>>> > >>>>>>> Neha > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> On Friday, February 14, 2014, Jay Kreps <jay.kr...@gmail.com > >> <mailto: > >>>>>>> jay.kr...@gmail.com><mailto: > >>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto: > >>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto: > >>>>>>> jay.kr...@gmail.com>><mailto: > >>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto: > >>>>>>> jay.kr...@gmail.com><mailto:jay.kreps@gmail > >>>>>>> .com>>> > >>>>>>> wrote: > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> Oh, interesting. So I am assuming the following implementation: > >>>>>>> > >>>>>>> 1. We have an in-memory fetch position which controls the next > fetch > >>>>>>> > >>>>>>> offset. > >>>>>>> > >>>>>>> 2. Changing this has no effect until you poll again at which point > >>>>>>> > >>>>>>> your fetch request will be from the newly specified offset 3. We > >>>>>>> > >>>>>>> then have an in-memory but also remotely stored committed offset. > >>>>>>> > >>>>>>> 4. Calling commit has the effect of saving the fetch position as > >>>>>>> > >>>>>>> both the in memory committed position and in the remote store 5. > >>>>>>> > >>>>>>> Auto-commit is the same as periodically calling commit on all > >>>>>>> > >>>>>>> positions. > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> So batching on commit as well as getting the committed position > >>>>>>> > >>>>>>> makes sense, but batching the fetch position wouldn't, right? I > >>>>>>> > >>>>>>> think you are actually thinking of a different approach. > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> -Jay > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> On Thu, Feb 13, 2014 at 10:40 PM, Neha Narkhede > >>>>>>> > >>>>>>> <neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto: > >>>>>>> neha.narkh...@gmail.com><mailto: > >>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>> > >>>>>>> > >>>>>>> <javascript:;> > >>>>>>> > >>>>>>> wrote: > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> I think you are saying both, i.e. if you have committed on a > >>>>>>> > >>>>>>> partition it returns you that value but if you > >>>>>>> > >>>>>>> haven't > >>>>>>> > >>>>>>> it does a remote lookup? > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> Correct. > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> The other argument for making committed batched is that commit() > >>>>>>> > >>>>>>> is batched, so there is symmetry. > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> position() and seek() are always in memory changes (I assume) so > >>>>>>> > >>>>>>> there > >>>>>>> > >>>>>>> is > >>>>>>> > >>>>>>> no need to batch them. > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> I'm not as sure as you are about that assumption being true. > >>>>>>> > >>>>>>> Basically > >>>>>>> > >>>>>>> in > >>>>>>> > >>>>>>> my example above, the batching argument for committed() also > >>>>>>> > >>>>>>> applies to > >>>>>>> > >>>>>>> position() since one purpose of fetching a partition's offset is > >>>>>>> > >>>>>>> to use > >>>>>>> > >>>>>>> it > >>>>>>> > >>>>>>> to set the position of the consumer to that offset. Since that > >>>>>>> > >>>>>>> might > >>>>>>> > >>>>>>> lead > >>>>>>> > >>>>>>> to a remote OffsetRequest call, I think we probably would be > >>>>>>> > >>>>>>> better off batching it. > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> Another option for naming would be position/reposition instead of > >>>>>>> > >>>>>>> position/seek. > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> I think position/seek is better since it aligns with Java file > APIs. > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> I also think your suggestion about ConsumerPosition makes sense. > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> Thanks, > >>>>>>> > >>>>>>> Neha > >>>>>>> > >>>>>>> On Feb 13, 2014 9:22 PM, "Jay Kreps" <jay.kr...@gmail.com<mailto: > >>>>>>> jay.kr...@gmail.com><mailto: > >>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto: > >>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto: > >>>>>>> jay.kr...@gmail.com>><mailto: > >>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto: > >>>>>>> jay.kr...@gmail.com><mailto:jay.kreps@gmail > >>>>>>> .com>>> > >>>>>>> wrote: > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> Hey Neha, > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> I actually wasn't proposing the name TopicOffsetPosition, that > >>>>>>> > >>>>>>> was > >>>>>>> > >>>>>>> just a > >>>>>>> > >>>>>>> typo. I meant TopicPartitionOffset, and I was just referencing > >>>>>>> > >>>>>>> what > >>>>>>> > >>>>>>> was > >>>>>>> > >>>>>>> in > >>>>>>> > >>>>>>> the javadoc. So to restate my proposal without the typo, using > >>>>>>> > >>>>>>> just > >>>>>>> > >>>>>>> the > >>>>>>> > >>>>>>> existing classes (that naming is a separate question): > >>>>>>> > >>>>>>> long position(TopicPartition tp) > >>>>>>> > >>>>>>> void seek(TopicPartitionOffset p) > >>>>>>> > >>>>>>> long committed(TopicPartition tp) > >>>>>>> > >>>>>>> void commit(TopicPartitionOffset...); > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> So I may be unclear on committed() (AKA lastCommittedOffset). Is > >>>>>>> > >>>>>>> it returning the in-memory value from the last commit by this > >>>>>>> > >>>>>>> consumer, > >>>>>>> > >>>>>>> or > >>>>>>> > >>>>>>> is > >>>>>>> > >>>>>>> it doing a remote fetch, or both? I think you are saying both, i.e. > >>>>>>> > >>>>>>> if > >>>>>>> > >>>>>>> you > >>>>>>> > >>>>>>> have committed on a partition it returns you that value but if > >>>>>>> > >>>>>>> you > >>>>>>> > >>>>>>> haven't > >>>>>>> > >>>>>>> it does a remote lookup? > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> The other argument for making committed batched is that commit() > >>>>>>> > >>>>>>> is batched, so there is symmetry. > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> position() and seek() are always in memory changes (I assume) so > >>>>>>> > >>>>>>> there > >>>>>>> > >>>>>>> is > >>>>>>> > >>>>>>> no need to batch them. > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> So taking all that into account what if we revise it to > >>>>>>> > >>>>>>> long position(TopicPartition tp) > >>>>>>> > >>>>>>> void seek(TopicPartitionOffset p) > >>>>>>> > >>>>>>> Map<TopicPartition, Long> committed(TopicPartition tp); > >>>>>>> > >>>>>>> void commit(TopicPartitionOffset...); > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> This is not symmetric between position/seek and commit/committed > >>>>>>> > >>>>>>> but > >>>>>>> > >>>>>>> it > >>>>>>> > >>>>>>> is > >>>>>>> > >>>>>>> convenient. Another option for naming would be > >>>>>>> > >>>>>>> position/reposition > >>>>>>> > >>>>>>> instead > >>>>>>> > >>>>>>> of position/seek. > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> With respect to the name TopicPartitionOffset, what I was trying > >>>>>>> > >>>>>>> to > >>>>>>> > >>>>>>> say > >>>>>>> > >>>>>>> is > >>>>>>> > >>>>>>> that I recommend we change that to something shorter. I think > >>>>>>> > >>>>>>> TopicPosition > >>>>>>> > >>>>>>> or ConsumerPosition might be better. Position does not refer to > >>>>>>> > >>>>>>> the variables in the object, it refers to the meaning of the > >>>>>>> > >>>>>>> object--it represents a position within a topic. The offset > >>>>>>> > >>>>>>> field in that object > >>>>>>> > >>>>>>> is > >>>>>>> > >>>>>>> still called the offset. TopicOffset, PartitionOffset, or > >>>>>>> > >>>>>>> ConsumerOffset > >>>>>>> > >>>>>>> would all be workable too. Basically I am just objecting to > >>>>>>> > >>>>>>> concatenating > >>>>>>> > >>>>>>> three nouns together. :-) > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> -Jay > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> On Thu, Feb 13, 2014 at 1:54 PM, Neha Narkhede < > >>>>>>> > >>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto: > >>>>>>> neha.narkh...@gmail.com><mailto: > >>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>><mailto: > >>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto: > >>>>>>> neha.narkh...@gmail.com>> > >>>>>>> > >>>>>>> wrote: > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> 2. It returns a list of results. But how can you use the list? > >>>>>>> > >>>>>>> The > >>>>>>> > >>>>>>> only > >>>>>>> > >>>>>>> way > >>>>>>> > >>>>>>> to use the list is to make a map of tp=>offset and then look > >>>>>>> > >>>>>>> up > >>>>>>> > >>>>>>> results > >>>>>>> > >>>>>>> in > >>>>>>> > >>>>>>> this map (or do a for loop over the list for the partition you > >>>>>>> > >>>>>>> want). I > >>>>>>> > >>>>>>> recommend that if this is an in-memory check we just do one at > >>>>>>> > >>>>>>> a > >>>>>>> > >>>>>>> time. > >>>>>>> > >>>>>>> E.g. > >>>>>>> > >>>>>>> long committedPosition( > >>>>>>> > >>>>>>> TopicPosition). > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> This was discussed in the previous emails. There is a choic > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> -- > >>>>>>> Robert Withers > >>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com><mailto: > >>>>>>> robert.with...@dish.com><mailto: > >>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com>> > >>>>>>> c: 303.919.5856 > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> -- > >>>>>>> Robert Withers > >>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com><mailto: > >>>>>>> robert.with...@dish.com> > >>>>>>> c: 303.919.5856 > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> -- > >>>>>>> -- Guozhang > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> -- > >>>>>>> Robert Withers > >>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com> > >>>>>>> c: 303.919.5856 > >>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > >