1. I would prefer PartitionsAssigned and PartitionsRevoked as that seems clearer to me.
-Jay On Tue, Feb 25, 2014 at 10:19 AM, Neha Narkhede <neha.narkh...@gmail.com>wrote: > Thanks for the reviews so far! There are a few outstanding questions - > > 1. It will be good to make the rebalance callbacks forward compatible with > Java 8 capabilities. We can change it to PartitionsAssignedCallback > and PartitionsRevokedCallback or RebalanceBeginCallback and > RebalanceEndCallback? > > If there are no objections, I will change it to RebalanceBeginCallback and > RebalanceEndCallback. > > 2. The return type for committed() is List<TopicPartitionOffset>. There > was a suggestion to change it to either be Map<TopicPartition,Long> or > Map<TopicPartition, TopicPartitionOffset> > > Do people have feedback on this suggestion? > > > On Tue, Feb 25, 2014 at 9:56 AM, Neha Narkhede <neha.narkh...@gmail.com > >wrote: > > > Robert, > > > > Are you saying it is possible to get events from the high-level > consumerregarding various state machine changes? For instance, can we get a > > notification when a rebalance starts and ends, when a partition is > > assigned/unassigned, when an offset is committed on a partition, when a > > leader changes and so on? I call this OOB traffic, since they are not > the > > core messages streaming, but side-band events, yet they are still > > potentially useful to consumers. > > > > In the current proposal, you get notified when the state machine changes > > i.e. before and after a rebalance is triggered. Look at > > ConsumerRebalanceCallback< > http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerRebalanceCallback.html > > > > .Leader changes do not count as state machine changes for consumer > > rebalance purposes. > > > > Thanks, > > Neha > > > > > > On Tue, Feb 25, 2014 at 9:54 AM, Neha Narkhede <neha.narkh...@gmail.com > >wrote: > > > >> Jay/Robert - > >> > >> > >> I think what Robert is saying is that we need to think through the > offset > >> API to enable "batch processing" of topic data. Think of a process that > >> periodically kicks off to compute a data summary or do a data load or > >> something like that. I think what we need to support this is an api to > >> fetch the last offset from the server for a partition. Something like > >> long lastOffset(TopicPartition tp) > >> and for symmetry > >> long firstOffset(TopicPartition tp) > >> > >> Likely this would have to be batched. > >> > >> A fixed range of data load can be done using the existing APIs as > >> follows. This assumes you know the endOffset which can be currentOffset > + n > >> (number of messages in the load) > >> > >> long startOffset = consumer.position(partition); > >> long endOffset = startOffset + n; > >> while(consumer.position(partition) <= endOffset) { > >> List<ConsumerRecord> messages = consumer.poll(timeout, > >> TimeUnit.MILLISECONDS); > >> process(messages, endOffset); // processes messages until > >> endOffset > >> } > >> > >> Does that make sense? > >> > >> > >> On Tue, Feb 25, 2014 at 9:49 AM, Neha Narkhede <neha.narkh...@gmail.com > >wrote: > >> > >>> Thanks for the review, Jun. Here are some comments - > >>> > >>> > >>> 1. The using of ellipsis: This may make passing a list of items from a > >>> collection to the api a bit harder. Suppose that you have a list of > >>> topics > >>> stored in > >>> > >>> ArrayList<String> topics; > >>> > >>> If you want subscribe to all topics in one call, you will have to do: > >>> > >>> String[] topicArray = new String[topics.size()]; > >>> consumer.subscribe(topics. > >>> toArray(topicArray)); > >>> > >>> A similar argument can be made for arguably the more common use case of > >>> subscribing to a single topic as well. In these cases, user is required > >>> to write more > >>> code to create a single item collection and pass it in. Since > >>> subscription is extremely lightweight > >>> invoking it multiple times also seems like a workable solution, no? > >>> > >>> 2. It would be good to document that the following apis are mutually > >>> exclusive. Also, if the partition level subscription is specified, > there > >>> is > >>> no group management. Finally, unsubscribe() can only be used to cancel > >>> subscriptions with the same pattern. For example, you can't unsubscribe > >>> at > >>> the partition level if the subscription is done at the topic level. > >>> > >>> *subscribe*(java.lang.String... topics) > >>> *subscribe*(java.lang.String topic, int... partitions) > >>> > >>> Makes sense. Made the suggested improvements to the docs< > http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/Consumer.html#subscribe%28java.lang.String...%29 > > > >>> > >>> > >>> 3.commit(): The following comment in the doc should probably say > "commit > >>> offsets for partitions assigned to this consumer". > >>> > >>> If no partitions are specified, commits offsets for the subscribed > list > >>> of > >>> topics and partitions to Kafka. > >>> > >>> Could you give more context on this suggestion? Here is the entire doc > - > >>> > >>> Synchronously commits the specified offsets for the specified list of > >>> topics and partitions to *Kafka*. If no partitions are specified, > >>> commits offsets for the subscribed list of topics and partitions. > >>> > >>> The hope is to convey that if no partitions are specified, offsets will > >>> be committed for the subscribed list of partitions. One improvement > could > >>> be to > >>> explicitly state that the offsets returned on the last poll will be > >>> committed. I updated this to - > >>> > >>> Synchronously commits the specified offsets for the specified list of > >>> topics and partitions to *Kafka*. If no offsets are specified, commits > >>> offsets returned on the last {@link #poll(long, TimeUnit) poll()} for > >>> the subscribed list of topics and partitions. > >>> > >>> 4. There is inconsistency in specifying partitions. Sometimes we use > >>> TopicPartition and some other times we use String and int (see > >>> examples below). > >>> > >>> void onPartitionsAssigned(Consumer consumer, > >>> TopicPartition...partitions) > >>> > >>> public void *subscribe*(java.lang.String topic, int... partitions) > >>> > >>> Yes, this was discussed previously. I think generally the consensus > >>> seems to be to use the higher level > >>> classes everywhere. Made those changes. > >>> > >>> What's the use case of position()? Isn't that just the nextOffset() on > >>> the > >>> last message returned from poll()? > >>> > >>> Yes, except in the case where a rebalance is triggered and poll() is > not > >>> yet invoked. Here, you would use position() to get the new fetch > position > >>> for the specific partition. Even if this is not a common use case, IMO > it > >>> is much easier to use position() to get the fetch offset than invoking > >>> nextOffset() on the last message. This also keeps the APIs symmetric, > which > >>> is nice. > >>> > >>> > >>> > >>> > >>> On Mon, Feb 24, 2014 at 7:06 PM, Withers, Robert < > >>> robert.with...@dish.com> wrote: > >>> > >>>> That's wonderful. Thanks for kafka. > >>>> > >>>> Rob > >>>> > >>>> On Feb 24, 2014, at 9:58 AM, Guozhang Wang <wangg...@gmail.com > <mailto: > >>>> wangg...@gmail.com>> wrote: > >>>> > >>>> Hi Robert, > >>>> > >>>> Yes, you can check out the callback functions in the new API > >>>> > >>>> onPartitionDesigned > >>>> onPartitionAssigned > >>>> > >>>> and see if they meet your needs. > >>>> > >>>> Guozhang > >>>> > >>>> > >>>> On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert < > >>>> robert.with...@dish.com<mailto:robert.with...@dish.com>>wrote: > >>>> > >>>> Jun, > >>>> > >>>> Are you saying it is possible to get events from the high-level > consumer > >>>> regarding various state machine changes? For instance, can we get a > >>>> notification when a rebalance starts and ends, when a partition is > >>>> assigned/unassigned, when an offset is committed on a partition, when > a > >>>> leader changes and so on? I call this OOB traffic, since they are not > >>>> the > >>>> core messages streaming, but side-band events, yet they are still > >>>> potentially useful to consumers. > >>>> > >>>> Thank you, > >>>> Robert > >>>> > >>>> > >>>> Robert Withers > >>>> Staff Analyst/Developer > >>>> o: (720) 514-8963 > >>>> c: (571) 262-1873 > >>>> > >>>> > >>>> > >>>> -----Original Message----- > >>>> From: Jun Rao [mailto:jun...@gmail.com] > >>>> Sent: Sunday, February 23, 2014 4:19 PM > >>>> To: users@kafka.apache.org<mailto:users@kafka.apache.org> > >>>> Subject: Re: New Consumer API discussion > >>>> > >>>> Robert, > >>>> > >>>> For the push orient api, you can potentially implement your own > >>>> MessageHandler with those methods. In the main loop of our new > consumer > >>>> api, you can just call those methods based on the events you get. > >>>> > >>>> Also, we already have an api to get the first and the last offset of a > >>>> partition (getOffsetBefore). > >>>> > >>>> Thanks, > >>>> > >>>> Jun > >>>> > >>>> > >>>> On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert > >>>> <robert.with...@dish.com<mailto:robert.with...@dish.com>>wrote: > >>>> > >>>> This is a good idea, too. I would modify it to include stream > >>>> marking, then you can have: > >>>> > >>>> long end = consumer.lastOffset(tp); > >>>> consumer.setMark(end); > >>>> while(consumer.beforeMark()) { > >>>> process(consumer.pollToMark()); > >>>> } > >>>> > >>>> or > >>>> > >>>> long end = consumer.lastOffset(tp); > >>>> consumer.setMark(end); > >>>> for(Object msg : consumer.iteratorToMark()) { > >>>> process(msg); > >>>> } > >>>> > >>>> I actually have 4 suggestions, then: > >>>> > >>>> * pull: stream marking > >>>> * pull: finite streams, bound by time range (up-to-now, yesterday) > or > >>>> offset > >>>> * pull: async api > >>>> * push: KafkaMessageSource, for a push model, with msg and OOB > events. > >>>> Build one in either individual or chunk mode and have a listener for > >>>> each msg or a listener for a chunk of msgs. Make it composable and > >>>> policy driven (chunked, range, commitOffsets policy, retry policy, > >>>> transactional) > >>>> > >>>> Thank you, > >>>> Robert > >>>> > >>>> On Feb 22, 2014, at 11:21 AM, Jay Kreps <jay.kr...@gmail.com<mailto: > >>>> jay.kr...@gmail.com><mailto: > >>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>>> wrote: > >>>> > >>>> I think what Robert is saying is that we need to think through the > >>>> offset API to enable "batch processing" of topic data. Think of a > >>>> process that periodically kicks off to compute a data summary or do a > >>>> data load or something like that. I think what we need to support this > >>>> is an api to fetch the last offset from the server for a partition. > >>>> Something like > >>>> long lastOffset(TopicPartition tp) > >>>> and for symmetry > >>>> long firstOffset(TopicPartition tp) > >>>> > >>>> Likely this would have to be batched. Essentially we should add this > >>>> use case to our set of code examples to write and think through. > >>>> > >>>> The usage would be something like > >>>> > >>>> long end = consumer.lastOffset(tp); > >>>> while(consumer.position < end) > >>>> process(consumer.poll()); > >>>> > >>>> -Jay > >>>> > >>>> > >>>> On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert > >>>> <robert.with...@dish.com<mailto:robert.with...@dish.com> > >>>> <mailto:robert.with...@dish.com>>wrote: > >>>> > >>>> Jun, > >>>> > >>>> I was originally thinking a non-blocking read from a distributed > >>>> stream should distinguish between "no local messages, but a fetch is > >>>> occurring" > >>>> versus "you have drained the stream". The reason this may be valuable > >>>> to me is so I can write consumers that read all known traffic then > >>>> terminate. > >>>> You caused me to reconsider and I think I am conflating 2 things. One > >>>> is a sync/async api while the other is whether to have an infinite or > >>>> finite stream. Is it possible to build a finite KafkaStream on a > >>>> range of messages? > >>>> > >>>> Perhaps a Simple Consumer would do just fine and then I could start > >>>> off getting the writeOffset from zookeeper and tell it to read a > >>>> specified range per partition. I've done this and forked a simple > >>>> consumer runnable for each partition, for one of our analyzers. The > >>>> great thing about the high-level consumer is that rebalance, so I can > >>>> fork however many stream readers I want and you just figure it out for > >>>> me. In that way you offer us the control over the resource > >>>> consumption within a pull model. This is best to regulate message > >>>> pressure, they say. > >>>> > >>>> Combining that high-level rebalance ability with a ranged partition > >>>> drain could be really nice...build the stream with an ending position > >>>> and it is a finite stream, but retain the high-level rebalance. With > >>>> a finite stream, you would know the difference of the 2 async > >>>> scenarios: fetch-in-progress versus end-of-stream. With an infinite > >>>> stream, you never get end-of-stream. > >>>> > >>>> Aside from a high-level consumer over a finite range within each > >>>> partition, the other feature I can think of is more complicated. A > >>>> high-level consumer has state machine changes that the client cannot > >>>> access, to my knowledge. Our use of kafka has us invoke a message > >>>> handler with each message we consumer from the KafkaStream, so we > >>>> convert a pull-model to a push-model. Including the idea of receiving > >>>> notifications from state machine changes, what would be really nice is > >>>> to have a KafkaMessageSource, that is an eventful push model. If it > >>>> were thread-safe, then we could register listeners for various events: > >>>> > >>>> * opening-stream > >>>> * closing-stream > >>>> * message-arrived > >>>> * end-of-stream/no-more-messages-in-partition (for finite streams) > >>>> * rebalance started > >>>> * partition assigned > >>>> * partition unassigned > >>>> * rebalance finished > >>>> * partition-offset-committed > >>>> > >>>> Perhaps that is just our use, but instead of a pull-oriented > >>>> KafkaStream, is there any sense in your providing a push-oriented > >>>> KafkaMessageSource publishing OOB messages? > >>>> > >>>> thank you, > >>>> Robert > >>>> > >>>> On Feb 21, 2014, at 5:59 PM, Jun Rao <jun...@gmail.com<mailto: > >>>> jun...@gmail.com><mailto: > >>>> jun...@gmail.com<mailto:jun...@gmail.com>><mailto: > >>>> jun...@gmail.com<mailto:jun...@gmail.com><mailto:jun...@gmail.com>>> > >>>> wrote: > >>>> > >>>> Robert, > >>>> > >>>> Could you explain why you want to distinguish btw > >>>> FetchingInProgressException and NoMessagePendingException? The > >>>> nextMsgs() method that you want is exactly what poll() does. > >>>> > >>>> Thanks, > >>>> > >>>> Jun > >>>> > >>>> > >>>> On Wed, Feb 19, 2014 at 8:45 AM, Withers, Robert > >>>> <robert.with...@dish.com<mailto:robert.with...@dish.com> <mailto: > >>>> robert.with...@dish.com> > >>>> <mailto:robert.with...@dish.com>>wrote: > >>>> > >>>> I am not clear on why the consumer stream should be positionable, > >>>> especially if it is limited to the in-memory fetched messages. Could > >>>> someone explain to me, please? I really like the idea of committing > >>>> the offset specifically on those partitions with changed read offsets, > >>>> only. > >>>> > >>>> > >>>> > >>>> 2 items I would like to see added to the KafkaStream are: > >>>> > >>>> * a non-blocking next(), throws several exceptions > >>>> (FetchingInProgressException and a NoMessagePendingException or > >>>> something) to differentiate between fetching or no messages left. > >>>> > >>>> * A nextMsgs() method which returns all locally available > >>>> messages > >>>> and kicks off a fetch for the next chunk. > >>>> > >>>> > >>>> > >>>> If you are trying to add transactional features, then formally define > >>>> a DTP capability and pull in other server frameworks to share the > >>>> implementation. Should it be XA/Open? How about a new peer2peer DTP > >>>> protocol? > >>>> > >>>> > >>>> > >>>> Thank you, > >>>> > >>>> Robert > >>>> > >>>> > >>>> > >>>> Robert Withers > >>>> > >>>> Staff Analyst/Developer > >>>> > >>>> o: (720) 514-8963 > >>>> > >>>> c: (571) 262-1873 > >>>> > >>>> > >>>> > >>>> -----Original Message----- > >>>> From: Jay Kreps [mailto:jay.kr...@gmail.com] > >>>> Sent: Sunday, February 16, 2014 10:13 AM > >>>> To: users@kafka.apache.org<mailto:users@kafka.apache.org><mailto: > >>>> users@kafka.apache.org><mailto: > >>>> users@kafka.apache.org<mailto:users@kafka.apache.org>> > >>>> Subject: Re: New Consumer API discussion > >>>> > >>>> > >>>> > >>>> +1 I think those are good. It is a little weird that changing the > >>>> +fetch > >>>> > >>>> point is not batched but changing the commit point is, but I suppose > >>>> there is no helping that. > >>>> > >>>> > >>>> > >>>> -Jay > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> On Sat, Feb 15, 2014 at 7:52 AM, Neha Narkhede > >>>> <neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com> <mailto: > >>>> neha.narkh...@gmail.com> > >>>> <mailto:neha.narkh...@gmail.com> > >>>> <mailto:neha.narkh...@gmail.com>>wrote: > >>>> > >>>> > >>>> > >>>> Jay, > >>>> > >>>> > >>>> > >>>> That makes sense. position/seek deal with changing the consumers > >>>> > >>>> in-memory data, so there is no remote rpc there. For some reason, I > >>>> > >>>> got committed and seek mixed up in my head at that time :) > >>>> > >>>> > >>>> > >>>> So we still end up with > >>>> > >>>> > >>>> > >>>> long position(TopicPartition tp) > >>>> > >>>> void seek(TopicPartitionOffset p) > >>>> > >>>> Map<TopicPartition, Long> committed(TopicPartition tp); > >>>> > >>>> void commit(TopicPartitionOffset...); > >>>> > >>>> > >>>> > >>>> Thanks, > >>>> > >>>> Neha > >>>> > >>>> > >>>> > >>>> On Friday, February 14, 2014, Jay Kreps <jay.kr...@gmail.com<mailto: > >>>> jay.kr...@gmail.com><mailto: > >>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto: > >>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto: > >>>> jay.kr...@gmail.com>><mailto: > >>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto: > >>>> jay.kr...@gmail.com><mailto:jay.kreps@gmail > >>>> .com>>> > >>>> wrote: > >>>> > >>>> > >>>> > >>>> Oh, interesting. So I am assuming the following implementation: > >>>> > >>>> 1. We have an in-memory fetch position which controls the next fetch > >>>> > >>>> offset. > >>>> > >>>> 2. Changing this has no effect until you poll again at which point > >>>> > >>>> your fetch request will be from the newly specified offset 3. We > >>>> > >>>> then have an in-memory but also remotely stored committed offset. > >>>> > >>>> 4. Calling commit has the effect of saving the fetch position as > >>>> > >>>> both the in memory committed position and in the remote store 5. > >>>> > >>>> Auto-commit is the same as periodically calling commit on all > >>>> > >>>> positions. > >>>> > >>>> > >>>> > >>>> So batching on commit as well as getting the committed position > >>>> > >>>> makes sense, but batching the fetch position wouldn't, right? I > >>>> > >>>> think you are actually thinking of a different approach. > >>>> > >>>> > >>>> > >>>> -Jay > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> On Thu, Feb 13, 2014 at 10:40 PM, Neha Narkhede > >>>> > >>>> <neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto: > >>>> neha.narkh...@gmail.com><mailto: > >>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>> > >>>> > >>>> <javascript:;> > >>>> > >>>> wrote: > >>>> > >>>> > >>>> > >>>> I think you are saying both, i.e. if you have committed on a > >>>> > >>>> partition it returns you that value but if you > >>>> > >>>> haven't > >>>> > >>>> it does a remote lookup? > >>>> > >>>> > >>>> > >>>> Correct. > >>>> > >>>> > >>>> > >>>> The other argument for making committed batched is that commit() > >>>> > >>>> is batched, so there is symmetry. > >>>> > >>>> > >>>> > >>>> position() and seek() are always in memory changes (I assume) so > >>>> > >>>> there > >>>> > >>>> is > >>>> > >>>> no need to batch them. > >>>> > >>>> > >>>> > >>>> I'm not as sure as you are about that assumption being true. > >>>> > >>>> Basically > >>>> > >>>> in > >>>> > >>>> my example above, the batching argument for committed() also > >>>> > >>>> applies to > >>>> > >>>> position() since one purpose of fetching a partition's offset is > >>>> > >>>> to use > >>>> > >>>> it > >>>> > >>>> to set the position of the consumer to that offset. Since that > >>>> > >>>> might > >>>> > >>>> lead > >>>> > >>>> to a remote OffsetRequest call, I think we probably would be > >>>> > >>>> better off batching it. > >>>> > >>>> > >>>> > >>>> Another option for naming would be position/reposition instead of > >>>> > >>>> position/seek. > >>>> > >>>> > >>>> > >>>> I think position/seek is better since it aligns with Java file APIs. > >>>> > >>>> > >>>> > >>>> I also think your suggestion about ConsumerPosition makes sense. > >>>> > >>>> > >>>> > >>>> Thanks, > >>>> > >>>> Neha > >>>> > >>>> On Feb 13, 2014 9:22 PM, "Jay Kreps" <jay.kr...@gmail.com<mailto: > >>>> jay.kr...@gmail.com><mailto: > >>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto: > >>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto: > >>>> jay.kr...@gmail.com>><mailto: > >>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto: > >>>> jay.kr...@gmail.com><mailto:jay.kreps@gmail > >>>> .com>>> > >>>> wrote: > >>>> > >>>> > >>>> > >>>> Hey Neha, > >>>> > >>>> > >>>> > >>>> I actually wasn't proposing the name TopicOffsetPosition, that > >>>> > >>>> was > >>>> > >>>> just a > >>>> > >>>> typo. I meant TopicPartitionOffset, and I was just referencing > >>>> > >>>> what > >>>> > >>>> was > >>>> > >>>> in > >>>> > >>>> the javadoc. So to restate my proposal without the typo, using > >>>> > >>>> just > >>>> > >>>> the > >>>> > >>>> existing classes (that naming is a separate question): > >>>> > >>>> long position(TopicPartition tp) > >>>> > >>>> void seek(TopicPartitionOffset p) > >>>> > >>>> long committed(TopicPartition tp) > >>>> > >>>> void commit(TopicPartitionOffset...); > >>>> > >>>> > >>>> > >>>> So I may be unclear on committed() (AKA lastCommittedOffset). Is > >>>> > >>>> it returning the in-memory value from the last commit by this > >>>> > >>>> consumer, > >>>> > >>>> or > >>>> > >>>> is > >>>> > >>>> it doing a remote fetch, or both? I think you are saying both, i.e. > >>>> > >>>> if > >>>> > >>>> you > >>>> > >>>> have committed on a partition it returns you that value but if > >>>> > >>>> you > >>>> > >>>> haven't > >>>> > >>>> it does a remote lookup? > >>>> > >>>> > >>>> > >>>> The other argument for making committed batched is that commit() > >>>> > >>>> is batched, so there is symmetry. > >>>> > >>>> > >>>> > >>>> position() and seek() are always in memory changes (I assume) so > >>>> > >>>> there > >>>> > >>>> is > >>>> > >>>> no need to batch them. > >>>> > >>>> > >>>> > >>>> So taking all that into account what if we revise it to > >>>> > >>>> long position(TopicPartition tp) > >>>> > >>>> void seek(TopicPartitionOffset p) > >>>> > >>>> Map<TopicPartition, Long> committed(TopicPartition tp); > >>>> > >>>> void commit(TopicPartitionOffset...); > >>>> > >>>> > >>>> > >>>> This is not symmetric between position/seek and commit/committed > >>>> > >>>> but > >>>> > >>>> it > >>>> > >>>> is > >>>> > >>>> convenient. Another option for naming would be > >>>> > >>>> position/reposition > >>>> > >>>> instead > >>>> > >>>> of position/seek. > >>>> > >>>> > >>>> > >>>> With respect to the name TopicPartitionOffset, what I was trying > >>>> > >>>> to > >>>> > >>>> say > >>>> > >>>> is > >>>> > >>>> that I recommend we change that to something shorter. I think > >>>> > >>>> TopicPosition > >>>> > >>>> or ConsumerPosition might be better. Position does not refer to > >>>> > >>>> the variables in the object, it refers to the meaning of the > >>>> > >>>> object--it represents a position within a topic. The offset > >>>> > >>>> field in that object > >>>> > >>>> is > >>>> > >>>> still called the offset. TopicOffset, PartitionOffset, or > >>>> > >>>> ConsumerOffset > >>>> > >>>> would all be workable too. Basically I am just objecting to > >>>> > >>>> concatenating > >>>> > >>>> three nouns together. :-) > >>>> > >>>> > >>>> > >>>> -Jay > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> On Thu, Feb 13, 2014 at 1:54 PM, Neha Narkhede < > >>>> > >>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto: > >>>> neha.narkh...@gmail.com><mailto: > >>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>><mailto: > >>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto: > >>>> neha.narkh...@gmail.com>> > >>>> > >>>> wrote: > >>>> > >>>> > >>>> > >>>> 2. It returns a list of results. But how can you use the list? > >>>> > >>>> The > >>>> > >>>> only > >>>> > >>>> way > >>>> > >>>> to use the list is to make a map of tp=>offset and then look > >>>> > >>>> up > >>>> > >>>> results > >>>> > >>>> in > >>>> > >>>> this map (or do a for loop over the list for the partition you > >>>> > >>>> want). I > >>>> > >>>> recommend that if this is an in-memory check we just do one at > >>>> > >>>> a > >>>> > >>>> time. > >>>> > >>>> E.g. > >>>> > >>>> long committedPosition( > >>>> > >>>> TopicPosition). > >>>> > >>>> > >>>> > >>>> This was discussed in the previous emails. There is a choic > >>>> > >>>> > >>>> > >>>> > >>>> -- > >>>> Robert Withers > >>>> robert.with...@dish.com<mailto:robert.with...@dish.com><mailto: > >>>> robert.with...@dish.com><mailto: > >>>> robert.with...@dish.com<mailto:robert.with...@dish.com>> > >>>> c: 303.919.5856 > >>>> > >>>> > >>>> > >>>> -- > >>>> Robert Withers > >>>> robert.with...@dish.com<mailto:robert.with...@dish.com><mailto: > >>>> robert.with...@dish.com> > >>>> c: 303.919.5856 > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> -- > >>>> -- Guozhang > >>>> > >>>> > >>>> > >>>> -- > >>>> Robert Withers > >>>> robert.with...@dish.com<mailto:robert.with...@dish.com> > >>>> c: 303.919.5856 > >>>> > >>>> > >>> > >> > > >