Neha, I see how one might wish to implement onPartitionsAssigned and onPartitionsRevoked, but I don’t have a sense for how I might supply these implementations to a running consumer. What would the setup code look like to start a high-level consumer with these provided implementations?
thanks, Rob On Feb 27, 2014, at 3:48 AM, Neha Narkhede <neha.narkh...@gmail.com> wrote: > Rob, > > The use of the callbacks is explained in the javadoc here - > http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerRebalanceCallback.html > > Let me know if it makes sense. The hope is to improve the javadoc so that > it is self explanatory. > > Thanks, > Neha > > > On Wed, Feb 26, 2014 at 9:16 AM, Robert Withers > <robert.w.with...@gmail.com>wrote: > >> Neha, what does the use of the RebalanceBeginCallback and >> RebalanceEndCallback look like? >> >> thanks, >> Rob >> >> On Feb 25, 2014, at 3:51 PM, Neha Narkhede <neha.narkh...@gmail.com> >> wrote: >> >>> How do you know n? The whole point is that you need to be able to fetch >> the >>> end offset. You can't a priori decide you will load 1m messages without >>> knowing what is there. >>> >>> Hmm. I think what you are pointing out is that in the new consumer API, >> we >>> don't have a way to issue the equivalent of the existing >> getOffsetsBefore() >>> API. Agree that is a flaw that we should fix. >>> >>> Will update the docs/wiki with a few use cases that I've collected so far >>> and see if the API covers those. >>> >>> I would prefer PartitionsAssigned and PartitionsRevoked as that seems >>> clearer to me >>> >>> Well the RebalanceBeginCallback interface will have >> onPartitionsAssigned() >>> as the callback. Similarly, the RebalanceEndCallback interface will have >>> onPartitionsRevoked() as the callback. Makes sense? >>> >>> Thanks, >>> Neha >>> >>> >>> On Tue, Feb 25, 2014 at 2:38 PM, Jay Kreps <jay.kr...@gmail.com> wrote: >>> >>>> 1. I would prefer PartitionsAssigned and PartitionsRevoked as that seems >>>> clearer to me. >>>> >>>> -Jay >>>> >>>> >>>> On Tue, Feb 25, 2014 at 10:19 AM, Neha Narkhede < >> neha.narkh...@gmail.com >>>>> wrote: >>>> >>>>> Thanks for the reviews so far! There are a few outstanding questions - >>>>> >>>>> 1. It will be good to make the rebalance callbacks forward compatible >>>> with >>>>> Java 8 capabilities. We can change it to PartitionsAssignedCallback >>>>> and PartitionsRevokedCallback or RebalanceBeginCallback and >>>>> RebalanceEndCallback? >>>>> >>>>> If there are no objections, I will change it to RebalanceBeginCallback >>>> and >>>>> RebalanceEndCallback. >>>>> >>>>> 2. The return type for committed() is List<TopicPartitionOffset>. >> There >>>>> was a suggestion to change it to either be Map<TopicPartition,Long> or >>>>> Map<TopicPartition, TopicPartitionOffset> >>>>> >>>>> Do people have feedback on this suggestion? >>>>> >>>>> >>>>> On Tue, Feb 25, 2014 at 9:56 AM, Neha Narkhede < >> neha.narkh...@gmail.com >>>>>> wrote: >>>>> >>>>>> Robert, >>>>>> >>>>>> Are you saying it is possible to get events from the high-level >>>>> consumerregarding various state machine changes? For instance, can we >>>> get a >>>>>> notification when a rebalance starts and ends, when a partition is >>>>>> assigned/unassigned, when an offset is committed on a partition, when >> a >>>>>> leader changes and so on? I call this OOB traffic, since they are not >>>>> the >>>>>> core messages streaming, but side-band events, yet they are still >>>>>> potentially useful to consumers. >>>>>> >>>>>> In the current proposal, you get notified when the state machine >>>> changes >>>>>> i.e. before and after a rebalance is triggered. Look at >>>>>> ConsumerRebalanceCallback< >>>>> >>>> >> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerRebalanceCallback.html >>>>>> >>>>>> .Leader changes do not count as state machine changes for consumer >>>>>> rebalance purposes. >>>>>> >>>>>> Thanks, >>>>>> Neha >>>>>> >>>>>> >>>>>> On Tue, Feb 25, 2014 at 9:54 AM, Neha Narkhede < >>>> neha.narkh...@gmail.com >>>>>> wrote: >>>>>> >>>>>>> Jay/Robert - >>>>>>> >>>>>>> >>>>>>> I think what Robert is saying is that we need to think through the >>>>> offset >>>>>>> API to enable "batch processing" of topic data. Think of a process >>>> that >>>>>>> periodically kicks off to compute a data summary or do a data load or >>>>>>> something like that. I think what we need to support this is an api >> to >>>>>>> fetch the last offset from the server for a partition. Something like >>>>>>> long lastOffset(TopicPartition tp) >>>>>>> and for symmetry >>>>>>> long firstOffset(TopicPartition tp) >>>>>>> >>>>>>> Likely this would have to be batched. >>>>>>> >>>>>>> A fixed range of data load can be done using the existing APIs as >>>>>>> follows. This assumes you know the endOffset which can be >>>> currentOffset >>>>> + n >>>>>>> (number of messages in the load) >>>>>>> >>>>>>> long startOffset = consumer.position(partition); >>>>>>> long endOffset = startOffset + n; >>>>>>> while(consumer.position(partition) <= endOffset) { >>>>>>> List<ConsumerRecord> messages = consumer.poll(timeout, >>>>>>> TimeUnit.MILLISECONDS); >>>>>>> process(messages, endOffset); // processes messages >>>> until >>>>>>> endOffset >>>>>>> } >>>>>>> >>>>>>> Does that make sense? >>>>>>> >>>>>>> >>>>>>> On Tue, Feb 25, 2014 at 9:49 AM, Neha Narkhede < >>>> neha.narkh...@gmail.com >>>>>> wrote: >>>>>>> >>>>>>>> Thanks for the review, Jun. Here are some comments - >>>>>>>> >>>>>>>> >>>>>>>> 1. The using of ellipsis: This may make passing a list of items from >>>> a >>>>>>>> collection to the api a bit harder. Suppose that you have a list of >>>>>>>> topics >>>>>>>> stored in >>>>>>>> >>>>>>>> ArrayList<String> topics; >>>>>>>> >>>>>>>> If you want subscribe to all topics in one call, you will have to >> do: >>>>>>>> >>>>>>>> String[] topicArray = new String[topics.size()]; >>>>>>>> consumer.subscribe(topics. >>>>>>>> toArray(topicArray)); >>>>>>>> >>>>>>>> A similar argument can be made for arguably the more common use case >>>> of >>>>>>>> subscribing to a single topic as well. In these cases, user is >>>> required >>>>>>>> to write more >>>>>>>> code to create a single item collection and pass it in. Since >>>>>>>> subscription is extremely lightweight >>>>>>>> invoking it multiple times also seems like a workable solution, no? >>>>>>>> >>>>>>>> 2. It would be good to document that the following apis are mutually >>>>>>>> exclusive. Also, if the partition level subscription is specified, >>>>> there >>>>>>>> is >>>>>>>> no group management. Finally, unsubscribe() can only be used to >>>> cancel >>>>>>>> subscriptions with the same pattern. For example, you can't >>>> unsubscribe >>>>>>>> at >>>>>>>> the partition level if the subscription is done at the topic level. >>>>>>>> >>>>>>>> *subscribe*(java.lang.String... topics) >>>>>>>> *subscribe*(java.lang.String topic, int... partitions) >>>>>>>> >>>>>>>> Makes sense. Made the suggested improvements to the docs< >>>>> >>>> >> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/Consumer.html#subscribe%28java.lang.String...%29 >>>>>> >>>>>>>> >>>>>>>> >>>>>>>> 3.commit(): The following comment in the doc should probably say >>>>> "commit >>>>>>>> offsets for partitions assigned to this consumer". >>>>>>>> >>>>>>>> If no partitions are specified, commits offsets for the subscribed >>>>> list >>>>>>>> of >>>>>>>> topics and partitions to Kafka. >>>>>>>> >>>>>>>> Could you give more context on this suggestion? Here is the entire >>>> doc >>>>> - >>>>>>>> >>>>>>>> Synchronously commits the specified offsets for the specified list >> of >>>>>>>> topics and partitions to *Kafka*. If no partitions are specified, >>>>>>>> commits offsets for the subscribed list of topics and partitions. >>>>>>>> >>>>>>>> The hope is to convey that if no partitions are specified, offsets >>>> will >>>>>>>> be committed for the subscribed list of partitions. One improvement >>>>> could >>>>>>>> be to >>>>>>>> explicitly state that the offsets returned on the last poll will be >>>>>>>> committed. I updated this to - >>>>>>>> >>>>>>>> Synchronously commits the specified offsets for the specified list >> of >>>>>>>> topics and partitions to *Kafka*. If no offsets are specified, >>>> commits >>>>>>>> offsets returned on the last {@link #poll(long, TimeUnit) poll()} >> for >>>>>>>> the subscribed list of topics and partitions. >>>>>>>> >>>>>>>> 4. There is inconsistency in specifying partitions. Sometimes we use >>>>>>>> TopicPartition and some other times we use String and int (see >>>>>>>> examples below). >>>>>>>> >>>>>>>> void onPartitionsAssigned(Consumer consumer, >>>>>>>> TopicPartition...partitions) >>>>>>>> >>>>>>>> public void *subscribe*(java.lang.String topic, int... partitions) >>>>>>>> >>>>>>>> Yes, this was discussed previously. I think generally the consensus >>>>>>>> seems to be to use the higher level >>>>>>>> classes everywhere. Made those changes. >>>>>>>> >>>>>>>> What's the use case of position()? Isn't that just the nextOffset() >>>> on >>>>>>>> the >>>>>>>> last message returned from poll()? >>>>>>>> >>>>>>>> Yes, except in the case where a rebalance is triggered and poll() is >>>>> not >>>>>>>> yet invoked. Here, you would use position() to get the new fetch >>>>> position >>>>>>>> for the specific partition. Even if this is not a common use case, >>>> IMO >>>>> it >>>>>>>> is much easier to use position() to get the fetch offset than >>>> invoking >>>>>>>> nextOffset() on the last message. This also keeps the APIs >> symmetric, >>>>> which >>>>>>>> is nice. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Mon, Feb 24, 2014 at 7:06 PM, Withers, Robert < >>>>>>>> robert.with...@dish.com> wrote: >>>>>>>> >>>>>>>>> That's wonderful. Thanks for kafka. >>>>>>>>> >>>>>>>>> Rob >>>>>>>>> >>>>>>>>> On Feb 24, 2014, at 9:58 AM, Guozhang Wang <wangg...@gmail.com >>>>> <mailto: >>>>>>>>> wangg...@gmail.com>> wrote: >>>>>>>>> >>>>>>>>> Hi Robert, >>>>>>>>> >>>>>>>>> Yes, you can check out the callback functions in the new API >>>>>>>>> >>>>>>>>> onPartitionDesigned >>>>>>>>> onPartitionAssigned >>>>>>>>> >>>>>>>>> and see if they meet your needs. >>>>>>>>> >>>>>>>>> Guozhang >>>>>>>>> >>>>>>>>> >>>>>>>>> On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert < >>>>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com>>wrote: >>>>>>>>> >>>>>>>>> Jun, >>>>>>>>> >>>>>>>>> Are you saying it is possible to get events from the high-level >>>>> consumer >>>>>>>>> regarding various state machine changes? For instance, can we get >> a >>>>>>>>> notification when a rebalance starts and ends, when a partition is >>>>>>>>> assigned/unassigned, when an offset is committed on a partition, >>>> when >>>>> a >>>>>>>>> leader changes and so on? I call this OOB traffic, since they are >>>> not >>>>>>>>> the >>>>>>>>> core messages streaming, but side-band events, yet they are still >>>>>>>>> potentially useful to consumers. >>>>>>>>> >>>>>>>>> Thank you, >>>>>>>>> Robert >>>>>>>>> >>>>>>>>> >>>>>>>>> Robert Withers >>>>>>>>> Staff Analyst/Developer >>>>>>>>> o: (720) 514-8963 >>>>>>>>> c: (571) 262-1873 >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -----Original Message----- >>>>>>>>> From: Jun Rao [mailto:jun...@gmail.com] >>>>>>>>> Sent: Sunday, February 23, 2014 4:19 PM >>>>>>>>> To: users@kafka.apache.org<mailto:users@kafka.apache.org> >>>>>>>>> Subject: Re: New Consumer API discussion >>>>>>>>> >>>>>>>>> Robert, >>>>>>>>> >>>>>>>>> For the push orient api, you can potentially implement your own >>>>>>>>> MessageHandler with those methods. In the main loop of our new >>>>> consumer >>>>>>>>> api, you can just call those methods based on the events you get. >>>>>>>>> >>>>>>>>> Also, we already have an api to get the first and the last offset >>>> of a >>>>>>>>> partition (getOffsetBefore). >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> >>>>>>>>> Jun >>>>>>>>> >>>>>>>>> >>>>>>>>> On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert >>>>>>>>> <robert.with...@dish.com<mailto:robert.with...@dish.com>>wrote: >>>>>>>>> >>>>>>>>> This is a good idea, too. I would modify it to include stream >>>>>>>>> marking, then you can have: >>>>>>>>> >>>>>>>>> long end = consumer.lastOffset(tp); >>>>>>>>> consumer.setMark(end); >>>>>>>>> while(consumer.beforeMark()) { >>>>>>>>> process(consumer.pollToMark()); >>>>>>>>> } >>>>>>>>> >>>>>>>>> or >>>>>>>>> >>>>>>>>> long end = consumer.lastOffset(tp); >>>>>>>>> consumer.setMark(end); >>>>>>>>> for(Object msg : consumer.iteratorToMark()) { >>>>>>>>> process(msg); >>>>>>>>> } >>>>>>>>> >>>>>>>>> I actually have 4 suggestions, then: >>>>>>>>> >>>>>>>>> * pull: stream marking >>>>>>>>> * pull: finite streams, bound by time range (up-to-now, >> yesterday) >>>>> or >>>>>>>>> offset >>>>>>>>> * pull: async api >>>>>>>>> * push: KafkaMessageSource, for a push model, with msg and OOB >>>>> events. >>>>>>>>> Build one in either individual or chunk mode and have a listener >> for >>>>>>>>> each msg or a listener for a chunk of msgs. Make it composable and >>>>>>>>> policy driven (chunked, range, commitOffsets policy, retry policy, >>>>>>>>> transactional) >>>>>>>>> >>>>>>>>> Thank you, >>>>>>>>> Robert >>>>>>>>> >>>>>>>>> On Feb 22, 2014, at 11:21 AM, Jay Kreps <jay.kr...@gmail.com >>>> <mailto: >>>>>>>>> jay.kr...@gmail.com><mailto: >>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>>> wrote: >>>>>>>>> >>>>>>>>> I think what Robert is saying is that we need to think through the >>>>>>>>> offset API to enable "batch processing" of topic data. Think of a >>>>>>>>> process that periodically kicks off to compute a data summary or do >>>> a >>>>>>>>> data load or something like that. I think what we need to support >>>> this >>>>>>>>> is an api to fetch the last offset from the server for a partition. >>>>>>>>> Something like >>>>>>>>> long lastOffset(TopicPartition tp) >>>>>>>>> and for symmetry >>>>>>>>> long firstOffset(TopicPartition tp) >>>>>>>>> >>>>>>>>> Likely this would have to be batched. Essentially we should add >> this >>>>>>>>> use case to our set of code examples to write and think through. >>>>>>>>> >>>>>>>>> The usage would be something like >>>>>>>>> >>>>>>>>> long end = consumer.lastOffset(tp); >>>>>>>>> while(consumer.position < end) >>>>>>>>> process(consumer.poll()); >>>>>>>>> >>>>>>>>> -Jay >>>>>>>>> >>>>>>>>> >>>>>>>>> On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert >>>>>>>>> <robert.with...@dish.com<mailto:robert.with...@dish.com> >>>>>>>>> <mailto:robert.with...@dish.com>>wrote: >>>>>>>>> >>>>>>>>> Jun, >>>>>>>>> >>>>>>>>> I was originally thinking a non-blocking read from a distributed >>>>>>>>> stream should distinguish between "no local messages, but a fetch >> is >>>>>>>>> occurring" >>>>>>>>> versus "you have drained the stream". The reason this may be >>>> valuable >>>>>>>>> to me is so I can write consumers that read all known traffic then >>>>>>>>> terminate. >>>>>>>>> You caused me to reconsider and I think I am conflating 2 things. >>>> One >>>>>>>>> is a sync/async api while the other is whether to have an infinite >>>> or >>>>>>>>> finite stream. Is it possible to build a finite KafkaStream on a >>>>>>>>> range of messages? >>>>>>>>> >>>>>>>>> Perhaps a Simple Consumer would do just fine and then I could start >>>>>>>>> off getting the writeOffset from zookeeper and tell it to read a >>>>>>>>> specified range per partition. I've done this and forked a simple >>>>>>>>> consumer runnable for each partition, for one of our analyzers. >> The >>>>>>>>> great thing about the high-level consumer is that rebalance, so I >>>> can >>>>>>>>> fork however many stream readers I want and you just figure it out >>>> for >>>>>>>>> me. In that way you offer us the control over the resource >>>>>>>>> consumption within a pull model. This is best to regulate message >>>>>>>>> pressure, they say. >>>>>>>>> >>>>>>>>> Combining that high-level rebalance ability with a ranged partition >>>>>>>>> drain could be really nice...build the stream with an ending >>>> position >>>>>>>>> and it is a finite stream, but retain the high-level rebalance. >>>> With >>>>>>>>> a finite stream, you would know the difference of the 2 async >>>>>>>>> scenarios: fetch-in-progress versus end-of-stream. With an >> infinite >>>>>>>>> stream, you never get end-of-stream. >>>>>>>>> >>>>>>>>> Aside from a high-level consumer over a finite range within each >>>>>>>>> partition, the other feature I can think of is more complicated. A >>>>>>>>> high-level consumer has state machine changes that the client >> cannot >>>>>>>>> access, to my knowledge. Our use of kafka has us invoke a message >>>>>>>>> handler with each message we consumer from the KafkaStream, so we >>>>>>>>> convert a pull-model to a push-model. Including the idea of >>>> receiving >>>>>>>>> notifications from state machine changes, what would be really nice >>>> is >>>>>>>>> to have a KafkaMessageSource, that is an eventful push model. If >> it >>>>>>>>> were thread-safe, then we could register listeners for various >>>> events: >>>>>>>>> >>>>>>>>> * opening-stream >>>>>>>>> * closing-stream >>>>>>>>> * message-arrived >>>>>>>>> * end-of-stream/no-more-messages-in-partition (for finite >> streams) >>>>>>>>> * rebalance started >>>>>>>>> * partition assigned >>>>>>>>> * partition unassigned >>>>>>>>> * rebalance finished >>>>>>>>> * partition-offset-committed >>>>>>>>> >>>>>>>>> Perhaps that is just our use, but instead of a pull-oriented >>>>>>>>> KafkaStream, is there any sense in your providing a push-oriented >>>>>>>>> KafkaMessageSource publishing OOB messages? >>>>>>>>> >>>>>>>>> thank you, >>>>>>>>> Robert >>>>>>>>> >>>>>>>>> On Feb 21, 2014, at 5:59 PM, Jun Rao <jun...@gmail.com<mailto: >>>>>>>>> jun...@gmail.com><mailto: >>>>>>>>> jun...@gmail.com<mailto:jun...@gmail.com>><mailto: >>>>>>>>> jun...@gmail.com<mailto:jun...@gmail.com><mailto:jun...@gmail.com >>>>>>> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>> Robert, >>>>>>>>> >>>>>>>>> Could you explain why you want to distinguish btw >>>>>>>>> FetchingInProgressException and NoMessagePendingException? The >>>>>>>>> nextMsgs() method that you want is exactly what poll() does. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> >>>>>>>>> Jun >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Feb 19, 2014 at 8:45 AM, Withers, Robert >>>>>>>>> <robert.with...@dish.com<mailto:robert.with...@dish.com> <mailto: >>>>>>>>> robert.with...@dish.com> >>>>>>>>> <mailto:robert.with...@dish.com>>wrote: >>>>>>>>> >>>>>>>>> I am not clear on why the consumer stream should be positionable, >>>>>>>>> especially if it is limited to the in-memory fetched messages. >>>> Could >>>>>>>>> someone explain to me, please? I really like the idea of >> committing >>>>>>>>> the offset specifically on those partitions with changed read >>>> offsets, >>>>>>>>> only. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> 2 items I would like to see added to the KafkaStream are: >>>>>>>>> >>>>>>>>> * a non-blocking next(), throws several exceptions >>>>>>>>> (FetchingInProgressException and a NoMessagePendingException or >>>>>>>>> something) to differentiate between fetching or no messages left. >>>>>>>>> >>>>>>>>> * A nextMsgs() method which returns all locally available >>>>>>>>> messages >>>>>>>>> and kicks off a fetch for the next chunk. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> If you are trying to add transactional features, then formally >>>> define >>>>>>>>> a DTP capability and pull in other server frameworks to share the >>>>>>>>> implementation. Should it be XA/Open? How about a new peer2peer >>>> DTP >>>>>>>>> protocol? >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Thank you, >>>>>>>>> >>>>>>>>> Robert >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Robert Withers >>>>>>>>> >>>>>>>>> Staff Analyst/Developer >>>>>>>>> >>>>>>>>> o: (720) 514-8963 >>>>>>>>> >>>>>>>>> c: (571) 262-1873 >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -----Original Message----- >>>>>>>>> From: Jay Kreps [mailto:jay.kr...@gmail.com] >>>>>>>>> Sent: Sunday, February 16, 2014 10:13 AM >>>>>>>>> To: users@kafka.apache.org<mailto:users@kafka.apache.org><mailto: >>>>>>>>> users@kafka.apache.org><mailto: >>>>>>>>> users@kafka.apache.org<mailto:users@kafka.apache.org>> >>>>>>>>> Subject: Re: New Consumer API discussion >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> +1 I think those are good. It is a little weird that changing the >>>>>>>>> +fetch >>>>>>>>> >>>>>>>>> point is not batched but changing the commit point is, but I >> suppose >>>>>>>>> there is no helping that. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -Jay >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Sat, Feb 15, 2014 at 7:52 AM, Neha Narkhede >>>>>>>>> <neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com> <mailto: >>>>>>>>> neha.narkh...@gmail.com> >>>>>>>>> <mailto:neha.narkh...@gmail.com> >>>>>>>>> <mailto:neha.narkh...@gmail.com>>wrote: >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Jay, >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> That makes sense. position/seek deal with changing the consumers >>>>>>>>> >>>>>>>>> in-memory data, so there is no remote rpc there. For some reason, I >>>>>>>>> >>>>>>>>> got committed and seek mixed up in my head at that time :) >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> So we still end up with >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> long position(TopicPartition tp) >>>>>>>>> >>>>>>>>> void seek(TopicPartitionOffset p) >>>>>>>>> >>>>>>>>> Map<TopicPartition, Long> committed(TopicPartition tp); >>>>>>>>> >>>>>>>>> void commit(TopicPartitionOffset...); >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> >>>>>>>>> Neha >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Friday, February 14, 2014, Jay Kreps <jay.kr...@gmail.com >>>> <mailto: >>>>>>>>> jay.kr...@gmail.com><mailto: >>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto: >>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto: >>>>>>>>> jay.kr...@gmail.com>><mailto: >>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto: >>>>>>>>> jay.kr...@gmail.com><mailto:jay.kreps@gmail >>>>>>>>> .com>>> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Oh, interesting. So I am assuming the following implementation: >>>>>>>>> >>>>>>>>> 1. We have an in-memory fetch position which controls the next >> fetch >>>>>>>>> >>>>>>>>> offset. >>>>>>>>> >>>>>>>>> 2. Changing this has no effect until you poll again at which point >>>>>>>>> >>>>>>>>> your fetch request will be from the newly specified offset 3. We >>>>>>>>> >>>>>>>>> then have an in-memory but also remotely stored committed offset. >>>>>>>>> >>>>>>>>> 4. Calling commit has the effect of saving the fetch position as >>>>>>>>> >>>>>>>>> both the in memory committed position and in the remote store 5. >>>>>>>>> >>>>>>>>> Auto-commit is the same as periodically calling commit on all >>>>>>>>> >>>>>>>>> positions. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> So batching on commit as well as getting the committed position >>>>>>>>> >>>>>>>>> makes sense, but batching the fetch position wouldn't, right? I >>>>>>>>> >>>>>>>>> think you are actually thinking of a different approach. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -Jay >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Feb 13, 2014 at 10:40 PM, Neha Narkhede >>>>>>>>> >>>>>>>>> <neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto: >>>>>>>>> neha.narkh...@gmail.com><mailto: >>>>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>> >>>>>>>>> >>>>>>>>> <javascript:;> >>>>>>>>> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> I think you are saying both, i.e. if you have committed on a >>>>>>>>> >>>>>>>>> partition it returns you that value but if you >>>>>>>>> >>>>>>>>> haven't >>>>>>>>> >>>>>>>>> it does a remote lookup? >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Correct. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> The other argument for making committed batched is that commit() >>>>>>>>> >>>>>>>>> is batched, so there is symmetry. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> position() and seek() are always in memory changes (I assume) so >>>>>>>>> >>>>>>>>> there >>>>>>>>> >>>>>>>>> is >>>>>>>>> >>>>>>>>> no need to batch them. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> I'm not as sure as you are about that assumption being true. >>>>>>>>> >>>>>>>>> Basically >>>>>>>>> >>>>>>>>> in >>>>>>>>> >>>>>>>>> my example above, the batching argument for committed() also >>>>>>>>> >>>>>>>>> applies to >>>>>>>>> >>>>>>>>> position() since one purpose of fetching a partition's offset is >>>>>>>>> >>>>>>>>> to use >>>>>>>>> >>>>>>>>> it >>>>>>>>> >>>>>>>>> to set the position of the consumer to that offset. Since that >>>>>>>>> >>>>>>>>> might >>>>>>>>> >>>>>>>>> lead >>>>>>>>> >>>>>>>>> to a remote OffsetRequest call, I think we probably would be >>>>>>>>> >>>>>>>>> better off batching it. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Another option for naming would be position/reposition instead of >>>>>>>>> >>>>>>>>> position/seek. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> I think position/seek is better since it aligns with Java file >> APIs. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> I also think your suggestion about ConsumerPosition makes sense. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> >>>>>>>>> Neha >>>>>>>>> >>>>>>>>> On Feb 13, 2014 9:22 PM, "Jay Kreps" <jay.kr...@gmail.com<mailto: >>>>>>>>> jay.kr...@gmail.com><mailto: >>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto: >>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto: >>>>>>>>> jay.kr...@gmail.com>><mailto: >>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto: >>>>>>>>> jay.kr...@gmail.com><mailto:jay.kreps@gmail >>>>>>>>> .com>>> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Hey Neha, >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> I actually wasn't proposing the name TopicOffsetPosition, that >>>>>>>>> >>>>>>>>> was >>>>>>>>> >>>>>>>>> just a >>>>>>>>> >>>>>>>>> typo. I meant TopicPartitionOffset, and I was just referencing >>>>>>>>> >>>>>>>>> what >>>>>>>>> >>>>>>>>> was >>>>>>>>> >>>>>>>>> in >>>>>>>>> >>>>>>>>> the javadoc. So to restate my proposal without the typo, using >>>>>>>>> >>>>>>>>> just >>>>>>>>> >>>>>>>>> the >>>>>>>>> >>>>>>>>> existing classes (that naming is a separate question): >>>>>>>>> >>>>>>>>> long position(TopicPartition tp) >>>>>>>>> >>>>>>>>> void seek(TopicPartitionOffset p) >>>>>>>>> >>>>>>>>> long committed(TopicPartition tp) >>>>>>>>> >>>>>>>>> void commit(TopicPartitionOffset...); >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> So I may be unclear on committed() (AKA lastCommittedOffset). Is >>>>>>>>> >>>>>>>>> it returning the in-memory value from the last commit by this >>>>>>>>> >>>>>>>>> consumer, >>>>>>>>> >>>>>>>>> or >>>>>>>>> >>>>>>>>> is >>>>>>>>> >>>>>>>>> it doing a remote fetch, or both? I think you are saying both, i.e. >>>>>>>>> >>>>>>>>> if >>>>>>>>> >>>>>>>>> you >>>>>>>>> >>>>>>>>> have committed on a partition it returns you that value but if >>>>>>>>> >>>>>>>>> you >>>>>>>>> >>>>>>>>> haven't >>>>>>>>> >>>>>>>>> it does a remote lookup? >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> The other argument for making committed batched is that commit() >>>>>>>>> >>>>>>>>> is batched, so there is symmetry. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> position() and seek() are always in memory changes (I assume) so >>>>>>>>> >>>>>>>>> there >>>>>>>>> >>>>>>>>> is >>>>>>>>> >>>>>>>>> no need to batch them. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> So taking all that into account what if we revise it to >>>>>>>>> >>>>>>>>> long position(TopicPartition tp) >>>>>>>>> >>>>>>>>> void seek(TopicPartitionOffset p) >>>>>>>>> >>>>>>>>> Map<TopicPartition, Long> committed(TopicPartition tp); >>>>>>>>> >>>>>>>>> void commit(TopicPartitionOffset...); >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> This is not symmetric between position/seek and commit/committed >>>>>>>>> >>>>>>>>> but >>>>>>>>> >>>>>>>>> it >>>>>>>>> >>>>>>>>> is >>>>>>>>> >>>>>>>>> convenient. Another option for naming would be >>>>>>>>> >>>>>>>>> position/reposition >>>>>>>>> >>>>>>>>> instead >>>>>>>>> >>>>>>>>> of position/seek. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> With respect to the name TopicPartitionOffset, what I was trying >>>>>>>>> >>>>>>>>> to >>>>>>>>> >>>>>>>>> say >>>>>>>>> >>>>>>>>> is >>>>>>>>> >>>>>>>>> that I recommend we change that to something shorter. I think >>>>>>>>> >>>>>>>>> TopicPosition >>>>>>>>> >>>>>>>>> or ConsumerPosition might be better. Position does not refer to >>>>>>>>> >>>>>>>>> the variables in the object, it refers to the meaning of the >>>>>>>>> >>>>>>>>> object--it represents a position within a topic. The offset >>>>>>>>> >>>>>>>>> field in that object >>>>>>>>> >>>>>>>>> is >>>>>>>>> >>>>>>>>> still called the offset. TopicOffset, PartitionOffset, or >>>>>>>>> >>>>>>>>> ConsumerOffset >>>>>>>>> >>>>>>>>> would all be workable too. Basically I am just objecting to >>>>>>>>> >>>>>>>>> concatenating >>>>>>>>> >>>>>>>>> three nouns together. :-) >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -Jay >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Feb 13, 2014 at 1:54 PM, Neha Narkhede < >>>>>>>>> >>>>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto: >>>>>>>>> neha.narkh...@gmail.com><mailto: >>>>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>><mailto: >>>>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto: >>>>>>>>> neha.narkh...@gmail.com>> >>>>>>>>> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> 2. It returns a list of results. But how can you use the list? >>>>>>>>> >>>>>>>>> The >>>>>>>>> >>>>>>>>> only >>>>>>>>> >>>>>>>>> way >>>>>>>>> >>>>>>>>> to use the list is to make a map of tp=>offset and then look >>>>>>>>> >>>>>>>>> up >>>>>>>>> >>>>>>>>> results >>>>>>>>> >>>>>>>>> in >>>>>>>>> >>>>>>>>> this map (or do a for loop over the list for the partition you >>>>>>>>> >>>>>>>>> want). I >>>>>>>>> >>>>>>>>> recommend that if this is an in-memory check we just do one at >>>>>>>>> >>>>>>>>> a >>>>>>>>> >>>>>>>>> time. >>>>>>>>> >>>>>>>>> E.g. >>>>>>>>> >>>>>>>>> long committedPosition( >>>>>>>>> >>>>>>>>> TopicPosition). >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> This was discussed in the previous emails. There is a choic >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Robert Withers >>>>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com><mailto: >>>>>>>>> robert.with...@dish.com><mailto: >>>>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com>> >>>>>>>>> c: 303.919.5856 >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Robert Withers >>>>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com><mailto: >>>>>>>>> robert.with...@dish.com> >>>>>>>>> c: 303.919.5856 >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> -- Guozhang >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Robert Withers >>>>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com> >>>>>>>>> c: 303.919.5856 >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >> >>