1. "The new consumer API is non blocking and instead of returning a blocking iterator, the consumer provides a poll() API that returns a list of records. "
So this means the consumer polls, and if there are new messages it pulls them down and then disconnects? Not really. The point I was trying to make is that the consumer now just returns a list of records instead of an iterator. If there are no more messages available, it returns an empty list of records. Under the covers, it keeps a connection open to every broker. 2. " The consumer also allows long poll to reduce the end-to-end message latency for low throughput data." How is this different than blocking? Is it even based meaning it keeps a long poll conneciton open, and if/when a new message arrives it triggers an event on the consumer side? It means that you can invoke poll with a timeout. If a message is available before the timeout is hit, it returns earlier. 3. " The consumer batches data and multiplexes I/O over TCP connections to each of the brokers it communicates with, for high throughput. " If it is single threaded, does each tcp brocker connection block? Not sure I understand how this works if it is single threaded. Take a look at this tutorial that explains non blocking socket I/O - *http://rox-xmlrpc.sourceforge.net/niotut/ <http://rox-xmlrpc.sourceforge.net/niotut/>* Thanks, Neha On Fri, Feb 28, 2014 at 12:44 PM, S Ahmed <sahmed1...@gmail.com> wrote: > Few clarifications: > > 1. "The new > consumer API is non blocking and instead of returning a blocking iterator, > the consumer provides a poll() API that returns a list of records. " > > So this means the consumer polls, and if there are new messages it pulls > them down and then disconnects? > > 2. > " The consumer also allows long poll > to reduce the end-to-end message latency for low throughput data." > > How is this different than blocking? Is it even based meaning it keeps a > long poll conneciton open, and if/when a new message arrives it triggers an > event on the consumer side? > > > 3. > " The consumer batches > data and multiplexes I/O over TCP connections to each of the brokers it > communicates with, for high throughput. " > > If it is single threaded, does each tcp brocker connection block? Not sure > I understand how this works if it is single threaded. > > > > On Thu, Feb 27, 2014 at 11:38 PM, Robert Withers < > robert.w.with...@gmail.com > > wrote: > > > Thank you, Neha, that makes it clear. Really, the aspect of all this > that > > we could really use is a way to do exactly once processing. We are > looking > > at more critical data. What are the latest thoughts on how to achieve > > exactly once and how might that affect a consumer API? > > > > Thanks, > > Rob > > > > On Feb 27, 2014, at 10:29 AM, Neha Narkhede <neha.narkh...@gmail.com> > > wrote: > > > > > Is this< > > > http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html#seek%28kafka.common.TopicPartitionOffset...%29 > > >what > > > you are looking for? Basically, I think from the overall feedback, it > > > looks like code snippets don't seem to work for overall understanding > of > > > the APIs. I plan to update the javadoc with more complete examples that > > > have been discussed so far on this thread and generally on the mailing > > list. > > > > > > Thanks, > > > Neha > > > > > > > > > > > > > > > On Thu, Feb 27, 2014 at 4:17 AM, Robert Withers > > > <robert.w.with...@gmail.com>wrote: > > > > > >> Neha, > > >> > > >> I see how one might wish to implement onPartitionsAssigned and > > >> onPartitionsRevoked, but I don't have a sense for how I might supply > > these > > >> implementations to a running consumer. What would the setup code look > > like > > >> to start a high-level consumer with these provided implementations? > > >> > > >> thanks, > > >> Rob > > >> > > >> > > >> On Feb 27, 2014, at 3:48 AM, Neha Narkhede <neha.narkh...@gmail.com> > > >> wrote: > > >> > > >>> Rob, > > >>> > > >>> The use of the callbacks is explained in the javadoc here - > > >>> > > >> > > > http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerRebalanceCallback.html > > >>> > > >>> Let me know if it makes sense. The hope is to improve the javadoc so > > that > > >>> it is self explanatory. > > >>> > > >>> Thanks, > > >>> Neha > > >>> > > >>> > > >>> On Wed, Feb 26, 2014 at 9:16 AM, Robert Withers > > >>> <robert.w.with...@gmail.com>wrote: > > >>> > > >>>> Neha, what does the use of the RebalanceBeginCallback and > > >>>> RebalanceEndCallback look like? > > >>>> > > >>>> thanks, > > >>>> Rob > > >>>> > > >>>> On Feb 25, 2014, at 3:51 PM, Neha Narkhede <neha.narkh...@gmail.com > > > > >>>> wrote: > > >>>> > > >>>>> How do you know n? The whole point is that you need to be able to > > fetch > > >>>> the > > >>>>> end offset. You can't a priori decide you will load 1m messages > > without > > >>>>> knowing what is there. > > >>>>> > > >>>>> Hmm. I think what you are pointing out is that in the new consumer > > API, > > >>>> we > > >>>>> don't have a way to issue the equivalent of the existing > > >>>> getOffsetsBefore() > > >>>>> API. Agree that is a flaw that we should fix. > > >>>>> > > >>>>> Will update the docs/wiki with a few use cases that I've collected > so > > >> far > > >>>>> and see if the API covers those. > > >>>>> > > >>>>> I would prefer PartitionsAssigned and PartitionsRevoked as that > seems > > >>>>> clearer to me > > >>>>> > > >>>>> Well the RebalanceBeginCallback interface will have > > >>>> onPartitionsAssigned() > > >>>>> as the callback. Similarly, the RebalanceEndCallback interface will > > >> have > > >>>>> onPartitionsRevoked() as the callback. Makes sense? > > >>>>> > > >>>>> Thanks, > > >>>>> Neha > > >>>>> > > >>>>> > > >>>>> On Tue, Feb 25, 2014 at 2:38 PM, Jay Kreps <jay.kr...@gmail.com> > > >> wrote: > > >>>>> > > >>>>>> 1. I would prefer PartitionsAssigned and PartitionsRevoked as that > > >> seems > > >>>>>> clearer to me. > > >>>>>> > > >>>>>> -Jay > > >>>>>> > > >>>>>> > > >>>>>> On Tue, Feb 25, 2014 at 10:19 AM, Neha Narkhede < > > >>>> neha.narkh...@gmail.com > > >>>>>>> wrote: > > >>>>>> > > >>>>>>> Thanks for the reviews so far! There are a few outstanding > > questions > > >> - > > >>>>>>> > > >>>>>>> 1. It will be good to make the rebalance callbacks forward > > >> compatible > > >>>>>> with > > >>>>>>> Java 8 capabilities. We can change it to > PartitionsAssignedCallback > > >>>>>>> and PartitionsRevokedCallback or RebalanceBeginCallback and > > >>>>>>> RebalanceEndCallback? > > >>>>>>> > > >>>>>>> If there are no objections, I will change it to > > >> RebalanceBeginCallback > > >>>>>> and > > >>>>>>> RebalanceEndCallback. > > >>>>>>> > > >>>>>>> 2. The return type for committed() is > List<TopicPartitionOffset>. > > >>>> There > > >>>>>>> was a suggestion to change it to either be > Map<TopicPartition,Long> > > >> or > > >>>>>>> Map<TopicPartition, TopicPartitionOffset> > > >>>>>>> > > >>>>>>> Do people have feedback on this suggestion? > > >>>>>>> > > >>>>>>> > > >>>>>>> On Tue, Feb 25, 2014 at 9:56 AM, Neha Narkhede < > > >>>> neha.narkh...@gmail.com > > >>>>>>>> wrote: > > >>>>>>> > > >>>>>>>> Robert, > > >>>>>>>> > > >>>>>>>> Are you saying it is possible to get events from the high-level > > >>>>>>> consumerregarding various state machine changes? For instance, > can > > >> we > > >>>>>> get a > > >>>>>>>> notification when a rebalance starts and ends, when a partition > is > > >>>>>>>> assigned/unassigned, when an offset is committed on a partition, > > >> when > > >>>> a > > >>>>>>>> leader changes and so on? I call this OOB traffic, since they > are > > >> not > > >>>>>>> the > > >>>>>>>> core messages streaming, but side-band events, yet they are > still > > >>>>>>>> potentially useful to consumers. > > >>>>>>>> > > >>>>>>>> In the current proposal, you get notified when the state machine > > >>>>>> changes > > >>>>>>>> i.e. before and after a rebalance is triggered. Look at > > >>>>>>>> ConsumerRebalanceCallback< > > >>>>>>> > > >>>>>> > > >>>> > > >> > > > http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerRebalanceCallback.html > > >>>>>>>> > > >>>>>>>> .Leader changes do not count as state machine changes for > consumer > > >>>>>>>> rebalance purposes. > > >>>>>>>> > > >>>>>>>> Thanks, > > >>>>>>>> Neha > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> On Tue, Feb 25, 2014 at 9:54 AM, Neha Narkhede < > > >>>>>> neha.narkh...@gmail.com > > >>>>>>>> wrote: > > >>>>>>>> > > >>>>>>>>> Jay/Robert - > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> I think what Robert is saying is that we need to think through > > the > > >>>>>>> offset > > >>>>>>>>> API to enable "batch processing" of topic data. Think of a > > process > > >>>>>> that > > >>>>>>>>> periodically kicks off to compute a data summary or do a data > > load > > >> or > > >>>>>>>>> something like that. I think what we need to support this is an > > api > > >>>> to > > >>>>>>>>> fetch the last offset from the server for a partition. > Something > > >> like > > >>>>>>>>> long lastOffset(TopicPartition tp) > > >>>>>>>>> and for symmetry > > >>>>>>>>> long firstOffset(TopicPartition tp) > > >>>>>>>>> > > >>>>>>>>> Likely this would have to be batched. > > >>>>>>>>> > > >>>>>>>>> A fixed range of data load can be done using the existing APIs > as > > >>>>>>>>> follows. This assumes you know the endOffset which can be > > >>>>>> currentOffset > > >>>>>>> + n > > >>>>>>>>> (number of messages in the load) > > >>>>>>>>> > > >>>>>>>>> long startOffset = consumer.position(partition); > > >>>>>>>>> long endOffset = startOffset + n; > > >>>>>>>>> while(consumer.position(partition) <= endOffset) { > > >>>>>>>>> List<ConsumerRecord> messages = consumer.poll(timeout, > > >>>>>>>>> TimeUnit.MILLISECONDS); > > >>>>>>>>> process(messages, endOffset); // processes messages > > >>>>>> until > > >>>>>>>>> endOffset > > >>>>>>>>> } > > >>>>>>>>> > > >>>>>>>>> Does that make sense? > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> On Tue, Feb 25, 2014 at 9:49 AM, Neha Narkhede < > > >>>>>> neha.narkh...@gmail.com > > >>>>>>>> wrote: > > >>>>>>>>> > > >>>>>>>>>> Thanks for the review, Jun. Here are some comments - > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> 1. The using of ellipsis: This may make passing a list of > items > > >> from > > >>>>>> a > > >>>>>>>>>> collection to the api a bit harder. Suppose that you have a > list > > >> of > > >>>>>>>>>> topics > > >>>>>>>>>> stored in > > >>>>>>>>>> > > >>>>>>>>>> ArrayList<String> topics; > > >>>>>>>>>> > > >>>>>>>>>> If you want subscribe to all topics in one call, you will have > > to > > >>>> do: > > >>>>>>>>>> > > >>>>>>>>>> String[] topicArray = new String[topics.size()]; > > >>>>>>>>>> consumer.subscribe(topics. > > >>>>>>>>>> toArray(topicArray)); > > >>>>>>>>>> > > >>>>>>>>>> A similar argument can be made for arguably the more common > use > > >> case > > >>>>>> of > > >>>>>>>>>> subscribing to a single topic as well. In these cases, user is > > >>>>>> required > > >>>>>>>>>> to write more > > >>>>>>>>>> code to create a single item collection and pass it in. Since > > >>>>>>>>>> subscription is extremely lightweight > > >>>>>>>>>> invoking it multiple times also seems like a workable > solution, > > >> no? > > >>>>>>>>>> > > >>>>>>>>>> 2. It would be good to document that the following apis are > > >> mutually > > >>>>>>>>>> exclusive. Also, if the partition level subscription is > > specified, > > >>>>>>> there > > >>>>>>>>>> is > > >>>>>>>>>> no group management. Finally, unsubscribe() can only be used > to > > >>>>>> cancel > > >>>>>>>>>> subscriptions with the same pattern. For example, you can't > > >>>>>> unsubscribe > > >>>>>>>>>> at > > >>>>>>>>>> the partition level if the subscription is done at the topic > > >> level. > > >>>>>>>>>> > > >>>>>>>>>> *subscribe*(java.lang.String... topics) > > >>>>>>>>>> *subscribe*(java.lang.String topic, int... partitions) > > >>>>>>>>>> > > >>>>>>>>>> Makes sense. Made the suggested improvements to the docs< > > >>>>>>> > > >>>>>> > > >>>> > > >> > > > http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/Consumer.html#subscribe%28java.lang.String...%29 > > >>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> 3.commit(): The following comment in the doc should probably > say > > >>>>>>> "commit > > >>>>>>>>>> offsets for partitions assigned to this consumer". > > >>>>>>>>>> > > >>>>>>>>>> If no partitions are specified, commits offsets for the > > subscribed > > >>>>>>> list > > >>>>>>>>>> of > > >>>>>>>>>> topics and partitions to Kafka. > > >>>>>>>>>> > > >>>>>>>>>> Could you give more context on this suggestion? Here is the > > entire > > >>>>>> doc > > >>>>>>> - > > >>>>>>>>>> > > >>>>>>>>>> Synchronously commits the specified offsets for the specified > > list > > >>>> of > > >>>>>>>>>> topics and partitions to *Kafka*. If no partitions are > > specified, > > >>>>>>>>>> commits offsets for the subscribed list of topics and > > partitions. > > >>>>>>>>>> > > >>>>>>>>>> The hope is to convey that if no partitions are specified, > > offsets > > >>>>>> will > > >>>>>>>>>> be committed for the subscribed list of partitions. One > > >> improvement > > >>>>>>> could > > >>>>>>>>>> be to > > >>>>>>>>>> explicitly state that the offsets returned on the last poll > will > > >> be > > >>>>>>>>>> committed. I updated this to - > > >>>>>>>>>> > > >>>>>>>>>> Synchronously commits the specified offsets for the specified > > list > > >>>> of > > >>>>>>>>>> topics and partitions to *Kafka*. If no offsets are specified, > > >>>>>> commits > > >>>>>>>>>> offsets returned on the last {@link #poll(long, TimeUnit) > > poll()} > > >>>> for > > >>>>>>>>>> the subscribed list of topics and partitions. > > >>>>>>>>>> > > >>>>>>>>>> 4. There is inconsistency in specifying partitions. Sometimes > we > > >> use > > >>>>>>>>>> TopicPartition and some other times we use String and int (see > > >>>>>>>>>> examples below). > > >>>>>>>>>> > > >>>>>>>>>> void onPartitionsAssigned(Consumer consumer, > > >>>>>>>>>> TopicPartition...partitions) > > >>>>>>>>>> > > >>>>>>>>>> public void *subscribe*(java.lang.String topic, int... > > partitions) > > >>>>>>>>>> > > >>>>>>>>>> Yes, this was discussed previously. I think generally the > > >> consensus > > >>>>>>>>>> seems to be to use the higher level > > >>>>>>>>>> classes everywhere. Made those changes. > > >>>>>>>>>> > > >>>>>>>>>> What's the use case of position()? Isn't that just the > > >> nextOffset() > > >>>>>> on > > >>>>>>>>>> the > > >>>>>>>>>> last message returned from poll()? > > >>>>>>>>>> > > >>>>>>>>>> Yes, except in the case where a rebalance is triggered and > > poll() > > >> is > > >>>>>>> not > > >>>>>>>>>> yet invoked. Here, you would use position() to get the new > fetch > > >>>>>>> position > > >>>>>>>>>> for the specific partition. Even if this is not a common use > > case, > > >>>>>> IMO > > >>>>>>> it > > >>>>>>>>>> is much easier to use position() to get the fetch offset than > > >>>>>> invoking > > >>>>>>>>>> nextOffset() on the last message. This also keeps the APIs > > >>>> symmetric, > > >>>>>>> which > > >>>>>>>>>> is nice. > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> On Mon, Feb 24, 2014 at 7:06 PM, Withers, Robert < > > >>>>>>>>>> robert.with...@dish.com> wrote: > > >>>>>>>>>> > > >>>>>>>>>>> That's wonderful. Thanks for kafka. > > >>>>>>>>>>> > > >>>>>>>>>>> Rob > > >>>>>>>>>>> > > >>>>>>>>>>> On Feb 24, 2014, at 9:58 AM, Guozhang Wang < > wangg...@gmail.com > > >>>>>>> <mailto: > > >>>>>>>>>>> wangg...@gmail.com>> wrote: > > >>>>>>>>>>> > > >>>>>>>>>>> Hi Robert, > > >>>>>>>>>>> > > >>>>>>>>>>> Yes, you can check out the callback functions in the new API > > >>>>>>>>>>> > > >>>>>>>>>>> onPartitionDesigned > > >>>>>>>>>>> onPartitionAssigned > > >>>>>>>>>>> > > >>>>>>>>>>> and see if they meet your needs. > > >>>>>>>>>>> > > >>>>>>>>>>> Guozhang > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert < > > >>>>>>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com > >>wrote: > > >>>>>>>>>>> > > >>>>>>>>>>> Jun, > > >>>>>>>>>>> > > >>>>>>>>>>> Are you saying it is possible to get events from the > high-level > > >>>>>>> consumer > > >>>>>>>>>>> regarding various state machine changes? For instance, can > we > > >> get > > >>>> a > > >>>>>>>>>>> notification when a rebalance starts and ends, when a > partition > > >> is > > >>>>>>>>>>> assigned/unassigned, when an offset is committed on a > > partition, > > >>>>>> when > > >>>>>>> a > > >>>>>>>>>>> leader changes and so on? I call this OOB traffic, since > they > > >> are > > >>>>>> not > > >>>>>>>>>>> the > > >>>>>>>>>>> core messages streaming, but side-band events, yet they are > > still > > >>>>>>>>>>> potentially useful to consumers. > > >>>>>>>>>>> > > >>>>>>>>>>> Thank you, > > >>>>>>>>>>> Robert > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> Robert Withers > > >>>>>>>>>>> Staff Analyst/Developer > > >>>>>>>>>>> o: (720) 514-8963 > > >>>>>>>>>>> c: (571) 262-1873 > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> -----Original Message----- > > >>>>>>>>>>> From: Jun Rao [mailto:jun...@gmail.com] > > >>>>>>>>>>> Sent: Sunday, February 23, 2014 4:19 PM > > >>>>>>>>>>> To: users@kafka.apache.org<mailto:users@kafka.apache.org> > > >>>>>>>>>>> Subject: Re: New Consumer API discussion > > >>>>>>>>>>> > > >>>>>>>>>>> Robert, > > >>>>>>>>>>> > > >>>>>>>>>>> For the push orient api, you can potentially implement your > own > > >>>>>>>>>>> MessageHandler with those methods. In the main loop of our > new > > >>>>>>> consumer > > >>>>>>>>>>> api, you can just call those methods based on the events you > > get. > > >>>>>>>>>>> > > >>>>>>>>>>> Also, we already have an api to get the first and the last > > offset > > >>>>>> of a > > >>>>>>>>>>> partition (getOffsetBefore). > > >>>>>>>>>>> > > >>>>>>>>>>> Thanks, > > >>>>>>>>>>> > > >>>>>>>>>>> Jun > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert > > >>>>>>>>>>> <robert.with...@dish.com<mailto:robert.with...@dish.com > > >>wrote: > > >>>>>>>>>>> > > >>>>>>>>>>> This is a good idea, too. I would modify it to include > stream > > >>>>>>>>>>> marking, then you can have: > > >>>>>>>>>>> > > >>>>>>>>>>> long end = consumer.lastOffset(tp); > > >>>>>>>>>>> consumer.setMark(end); > > >>>>>>>>>>> while(consumer.beforeMark()) { > > >>>>>>>>>>> process(consumer.pollToMark()); > > >>>>>>>>>>> } > > >>>>>>>>>>> > > >>>>>>>>>>> or > > >>>>>>>>>>> > > >>>>>>>>>>> long end = consumer.lastOffset(tp); > > >>>>>>>>>>> consumer.setMark(end); > > >>>>>>>>>>> for(Object msg : consumer.iteratorToMark()) { > > >>>>>>>>>>> process(msg); > > >>>>>>>>>>> } > > >>>>>>>>>>> > > >>>>>>>>>>> I actually have 4 suggestions, then: > > >>>>>>>>>>> > > >>>>>>>>>>> * pull: stream marking > > >>>>>>>>>>> * pull: finite streams, bound by time range (up-to-now, > > >>>> yesterday) > > >>>>>>> or > > >>>>>>>>>>> offset > > >>>>>>>>>>> * pull: async api > > >>>>>>>>>>> * push: KafkaMessageSource, for a push model, with msg and > > OOB > > >>>>>>> events. > > >>>>>>>>>>> Build one in either individual or chunk mode and have a > > listener > > >>>> for > > >>>>>>>>>>> each msg or a listener for a chunk of msgs. Make it > composable > > >> and > > >>>>>>>>>>> policy driven (chunked, range, commitOffsets policy, retry > > >> policy, > > >>>>>>>>>>> transactional) > > >>>>>>>>>>> > > >>>>>>>>>>> Thank you, > > >>>>>>>>>>> Robert > > >>>>>>>>>>> > > >>>>>>>>>>> On Feb 22, 2014, at 11:21 AM, Jay Kreps <jay.kr...@gmail.com > > >>>>>> <mailto: > > >>>>>>>>>>> jay.kr...@gmail.com><mailto: > > >>>>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>>> wrote: > > >>>>>>>>>>> > > >>>>>>>>>>> I think what Robert is saying is that we need to think > through > > >> the > > >>>>>>>>>>> offset API to enable "batch processing" of topic data. Think > > of a > > >>>>>>>>>>> process that periodically kicks off to compute a data summary > > or > > >> do > > >>>>>> a > > >>>>>>>>>>> data load or something like that. I think what we need to > > support > > >>>>>> this > > >>>>>>>>>>> is an api to fetch the last offset from the server for a > > >> partition. > > >>>>>>>>>>> Something like > > >>>>>>>>>>> long lastOffset(TopicPartition tp) > > >>>>>>>>>>> and for symmetry > > >>>>>>>>>>> long firstOffset(TopicPartition tp) > > >>>>>>>>>>> > > >>>>>>>>>>> Likely this would have to be batched. Essentially we should > add > > >>>> this > > >>>>>>>>>>> use case to our set of code examples to write and think > > through. > > >>>>>>>>>>> > > >>>>>>>>>>> The usage would be something like > > >>>>>>>>>>> > > >>>>>>>>>>> long end = consumer.lastOffset(tp); > > >>>>>>>>>>> while(consumer.position < end) > > >>>>>>>>>>> process(consumer.poll()); > > >>>>>>>>>>> > > >>>>>>>>>>> -Jay > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert > > >>>>>>>>>>> <robert.with...@dish.com<mailto:robert.with...@dish.com> > > >>>>>>>>>>> <mailto:robert.with...@dish.com>>wrote: > > >>>>>>>>>>> > > >>>>>>>>>>> Jun, > > >>>>>>>>>>> > > >>>>>>>>>>> I was originally thinking a non-blocking read from a > > distributed > > >>>>>>>>>>> stream should distinguish between "no local messages, but a > > fetch > > >>>> is > > >>>>>>>>>>> occurring" > > >>>>>>>>>>> versus "you have drained the stream". The reason this may be > > >>>>>> valuable > > >>>>>>>>>>> to me is so I can write consumers that read all known traffic > > >> then > > >>>>>>>>>>> terminate. > > >>>>>>>>>>> You caused me to reconsider and I think I am conflating 2 > > things. > > >>>>>> One > > >>>>>>>>>>> is a sync/async api while the other is whether to have an > > >> infinite > > >>>>>> or > > >>>>>>>>>>> finite stream. Is it possible to build a finite KafkaStream > > on a > > >>>>>>>>>>> range of messages? > > >>>>>>>>>>> > > >>>>>>>>>>> Perhaps a Simple Consumer would do just fine and then I could > > >> start > > >>>>>>>>>>> off getting the writeOffset from zookeeper and tell it to > read > > a > > >>>>>>>>>>> specified range per partition. I've done this and forked a > > >> simple > > >>>>>>>>>>> consumer runnable for each partition, for one of our > analyzers. > > >>>> The > > >>>>>>>>>>> great thing about the high-level consumer is that rebalance, > > so I > > >>>>>> can > > >>>>>>>>>>> fork however many stream readers I want and you just figure > it > > >> out > > >>>>>> for > > >>>>>>>>>>> me. In that way you offer us the control over the resource > > >>>>>>>>>>> consumption within a pull model. This is best to regulate > > >> message > > >>>>>>>>>>> pressure, they say. > > >>>>>>>>>>> > > >>>>>>>>>>> Combining that high-level rebalance ability with a ranged > > >> partition > > >>>>>>>>>>> drain could be really nice...build the stream with an ending > > >>>>>> position > > >>>>>>>>>>> and it is a finite stream, but retain the high-level > rebalance. > > >>>>>> With > > >>>>>>>>>>> a finite stream, you would know the difference of the 2 async > > >>>>>>>>>>> scenarios: fetch-in-progress versus end-of-stream. With an > > >>>> infinite > > >>>>>>>>>>> stream, you never get end-of-stream. > > >>>>>>>>>>> > > >>>>>>>>>>> Aside from a high-level consumer over a finite range within > > each > > >>>>>>>>>>> partition, the other feature I can think of is more > > complicated. > > >> A > > >>>>>>>>>>> high-level consumer has state machine changes that the client > > >>>> cannot > > >>>>>>>>>>> access, to my knowledge. Our use of kafka has us invoke a > > >> message > > >>>>>>>>>>> handler with each message we consumer from the KafkaStream, > so > > we > > >>>>>>>>>>> convert a pull-model to a push-model. Including the idea of > > >>>>>> receiving > > >>>>>>>>>>> notifications from state machine changes, what would be > really > > >> nice > > >>>>>> is > > >>>>>>>>>>> to have a KafkaMessageSource, that is an eventful push model. > > If > > >>>> it > > >>>>>>>>>>> were thread-safe, then we could register listeners for > various > > >>>>>> events: > > >>>>>>>>>>> > > >>>>>>>>>>> * opening-stream > > >>>>>>>>>>> * closing-stream > > >>>>>>>>>>> * message-arrived > > >>>>>>>>>>> * end-of-stream/no-more-messages-in-partition (for finite > > >>>> streams) > > >>>>>>>>>>> * rebalance started > > >>>>>>>>>>> * partition assigned > > >>>>>>>>>>> * partition unassigned > > >>>>>>>>>>> * rebalance finished > > >>>>>>>>>>> * partition-offset-committed > > >>>>>>>>>>> > > >>>>>>>>>>> Perhaps that is just our use, but instead of a pull-oriented > > >>>>>>>>>>> KafkaStream, is there any sense in your providing a > > push-oriented > > >>>>>>>>>>> KafkaMessageSource publishing OOB messages? > > >>>>>>>>>>> > > >>>>>>>>>>> thank you, > > >>>>>>>>>>> Robert > > >>>>>>>>>>> > > >>>>>>>>>>> On Feb 21, 2014, at 5:59 PM, Jun Rao <jun...@gmail.com > <mailto: > > >>>>>>>>>>> jun...@gmail.com><mailto: > > >>>>>>>>>>> jun...@gmail.com<mailto:jun...@gmail.com>><mailto: > > >>>>>>>>>>> jun...@gmail.com<mailto:jun...@gmail.com><mailto: > > >> jun...@gmail.com > > >>>>>>>>> > > >>>>>>>>>>> wrote: > > >>>>>>>>>>> > > >>>>>>>>>>> Robert, > > >>>>>>>>>>> > > >>>>>>>>>>> Could you explain why you want to distinguish btw > > >>>>>>>>>>> FetchingInProgressException and NoMessagePendingException? > The > > >>>>>>>>>>> nextMsgs() method that you want is exactly what poll() does. > > >>>>>>>>>>> > > >>>>>>>>>>> Thanks, > > >>>>>>>>>>> > > >>>>>>>>>>> Jun > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> On Wed, Feb 19, 2014 at 8:45 AM, Withers, Robert > > >>>>>>>>>>> <robert.with...@dish.com<mailto:robert.with...@dish.com> > > >> <mailto: > > >>>>>>>>>>> robert.with...@dish.com> > > >>>>>>>>>>> <mailto:robert.with...@dish.com>>wrote: > > >>>>>>>>>>> > > >>>>>>>>>>> I am not clear on why the consumer stream should be > > positionable, > > >>>>>>>>>>> especially if it is limited to the in-memory fetched > messages. > > >>>>>> Could > > >>>>>>>>>>> someone explain to me, please? I really like the idea of > > >>>> committing > > >>>>>>>>>>> the offset specifically on those partitions with changed read > > >>>>>> offsets, > > >>>>>>>>>>> only. > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> 2 items I would like to see added to the KafkaStream are: > > >>>>>>>>>>> > > >>>>>>>>>>> * a non-blocking next(), throws several exceptions > > >>>>>>>>>>> (FetchingInProgressException and a NoMessagePendingException > or > > >>>>>>>>>>> something) to differentiate between fetching or no messages > > left. > > >>>>>>>>>>> > > >>>>>>>>>>> * A nextMsgs() method which returns all locally > > available > > >>>>>>>>>>> messages > > >>>>>>>>>>> and kicks off a fetch for the next chunk. > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> If you are trying to add transactional features, then > formally > > >>>>>> define > > >>>>>>>>>>> a DTP capability and pull in other server frameworks to share > > the > > >>>>>>>>>>> implementation. Should it be XA/Open? How about a new > > peer2peer > > >>>>>> DTP > > >>>>>>>>>>> protocol? > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> Thank you, > > >>>>>>>>>>> > > >>>>>>>>>>> Robert > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> Robert Withers > > >>>>>>>>>>> > > >>>>>>>>>>> Staff Analyst/Developer > > >>>>>>>>>>> > > >>>>>>>>>>> o: (720) 514-8963 > > >>>>>>>>>>> > > >>>>>>>>>>> c: (571) 262-1873 > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> -----Original Message----- > > >>>>>>>>>>> From: Jay Kreps [mailto:jay.kr...@gmail.com] > > >>>>>>>>>>> Sent: Sunday, February 16, 2014 10:13 AM > > >>>>>>>>>>> To: users@kafka.apache.org<mailto:users@kafka.apache.org > > >>> <mailto: > > >>>>>>>>>>> users@kafka.apache.org><mailto: > > >>>>>>>>>>> users@kafka.apache.org<mailto:users@kafka.apache.org>> > > >>>>>>>>>>> Subject: Re: New Consumer API discussion > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> +1 I think those are good. It is a little weird that changing > > the > > >>>>>>>>>>> +fetch > > >>>>>>>>>>> > > >>>>>>>>>>> point is not batched but changing the commit point is, but I > > >>>> suppose > > >>>>>>>>>>> there is no helping that. > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> -Jay > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> On Sat, Feb 15, 2014 at 7:52 AM, Neha Narkhede > > >>>>>>>>>>> <neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com> > > >> <mailto: > > >>>>>>>>>>> neha.narkh...@gmail.com> > > >>>>>>>>>>> <mailto:neha.narkh...@gmail.com> > > >>>>>>>>>>> <mailto:neha.narkh...@gmail.com>>wrote: > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> Jay, > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> That makes sense. position/seek deal with changing the > > consumers > > >>>>>>>>>>> > > >>>>>>>>>>> in-memory data, so there is no remote rpc there. For some > > >> reason, I > > >>>>>>>>>>> > > >>>>>>>>>>> got committed and seek mixed up in my head at that time :) > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> So we still end up with > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> long position(TopicPartition tp) > > >>>>>>>>>>> > > >>>>>>>>>>> void seek(TopicPartitionOffset p) > > >>>>>>>>>>> > > >>>>>>>>>>> Map<TopicPartition, Long> committed(TopicPartition tp); > > >>>>>>>>>>> > > >>>>>>>>>>> void commit(TopicPartitionOffset...); > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> Thanks, > > >>>>>>>>>>> > > >>>>>>>>>>> Neha > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> On Friday, February 14, 2014, Jay Kreps <jay.kr...@gmail.com > > >>>>>> <mailto: > > >>>>>>>>>>> jay.kr...@gmail.com><mailto: > > >>>>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto: > > >>>>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto: > > >>>>>>>>>>> jay.kr...@gmail.com>><mailto: > > >>>>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto: > > >>>>>>>>>>> jay.kr...@gmail.com><mailto:jay.kreps@gmail > > >>>>>>>>>>> .com>>> > > >>>>>>>>>>> wrote: > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> Oh, interesting. So I am assuming the following > implementation: > > >>>>>>>>>>> > > >>>>>>>>>>> 1. We have an in-memory fetch position which controls the > next > > >>>> fetch > > >>>>>>>>>>> > > >>>>>>>>>>> offset. > > >>>>>>>>>>> > > >>>>>>>>>>> 2. Changing this has no effect until you poll again at which > > >> point > > >>>>>>>>>>> > > >>>>>>>>>>> your fetch request will be from the newly specified offset 3. > > We > > >>>>>>>>>>> > > >>>>>>>>>>> then have an in-memory but also remotely stored committed > > offset. > > >>>>>>>>>>> > > >>>>>>>>>>> 4. Calling commit has the effect of saving the fetch position > > as > > >>>>>>>>>>> > > >>>>>>>>>>> both the in memory committed position and in the remote store > > 5. > > >>>>>>>>>>> > > >>>>>>>>>>> Auto-commit is the same as periodically calling commit on all > > >>>>>>>>>>> > > >>>>>>>>>>> positions. > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> So batching on commit as well as getting the committed > position > > >>>>>>>>>>> > > >>>>>>>>>>> makes sense, but batching the fetch position wouldn't, > right? I > > >>>>>>>>>>> > > >>>>>>>>>>> think you are actually thinking of a different approach. > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> -Jay > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> On Thu, Feb 13, 2014 at 10:40 PM, Neha Narkhede > > >>>>>>>>>>> > > >>>>>>>>>>> <neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com > > ><mailto: > > >>>>>>>>>>> neha.narkh...@gmail.com><mailto: > > >>>>>>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>> > > >>>>>>>>>>> > > >>>>>>>>>>> <javascript:;> > > >>>>>>>>>>> > > >>>>>>>>>>> wrote: > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> I think you are saying both, i.e. if you have committed on a > > >>>>>>>>>>> > > >>>>>>>>>>> partition it returns you that value but if you > > >>>>>>>>>>> > > >>>>>>>>>>> haven't > > >>>>>>>>>>> > > >>>>>>>>>>> it does a remote lookup? > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> Correct. > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> The other argument for making committed batched is that > > commit() > > >>>>>>>>>>> > > >>>>>>>>>>> is batched, so there is symmetry. > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> position() and seek() are always in memory changes (I assume) > > so > > >>>>>>>>>>> > > >>>>>>>>>>> there > > >>>>>>>>>>> > > >>>>>>>>>>> is > > >>>>>>>>>>> > > >>>>>>>>>>> no need to batch them. > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> I'm not as sure as you are about that assumption being true. > > >>>>>>>>>>> > > >>>>>>>>>>> Basically > > >>>>>>>>>>> > > >>>>>>>>>>> in > > >>>>>>>>>>> > > >>>>>>>>>>> my example above, the batching argument for committed() also > > >>>>>>>>>>> > > >>>>>>>>>>> applies to > > >>>>>>>>>>> > > >>>>>>>>>>> position() since one purpose of fetching a partition's offset > > is > > >>>>>>>>>>> > > >>>>>>>>>>> to use > > >>>>>>>>>>> > > >>>>>>>>>>> it > > >>>>>>>>>>> > > >>>>>>>>>>> to set the position of the consumer to that offset. Since > that > > >>>>>>>>>>> > > >>>>>>>>>>> might > > >>>>>>>>>>> > > >>>>>>>>>>> lead > > >>>>>>>>>>> > > >>>>>>>>>>> to a remote OffsetRequest call, I think we probably would be > > >>>>>>>>>>> > > >>>>>>>>>>> better off batching it. > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> Another option for naming would be position/reposition > instead > > of > > >>>>>>>>>>> > > >>>>>>>>>>> position/seek. > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> I think position/seek is better since it aligns with Java > file > > >>>> APIs. > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> I also think your suggestion about ConsumerPosition makes > > sense. > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> Thanks, > > >>>>>>>>>>> > > >>>>>>>>>>> Neha > > >>>>>>>>>>> > > >>>>>>>>>>> On Feb 13, 2014 9:22 PM, "Jay Kreps" <jay.kr...@gmail.com > > >> <mailto: > > >>>>>>>>>>> jay.kr...@gmail.com><mailto: > > >>>>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto: > > >>>>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto: > > >>>>>>>>>>> jay.kr...@gmail.com>><mailto: > > >>>>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto: > > >>>>>>>>>>> jay.kr...@gmail.com><mailto:jay.kreps@gmail > > >>>>>>>>>>> .com>>> > > >>>>>>>>>>> wrote: > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> Hey Neha, > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> I actually wasn't proposing the name TopicOffsetPosition, > that > > >>>>>>>>>>> > > >>>>>>>>>>> was > > >>>>>>>>>>> > > >>>>>>>>>>> just a > > >>>>>>>>>>> > > >>>>>>>>>>> typo. I meant TopicPartitionOffset, and I was just > referencing > > >>>>>>>>>>> > > >>>>>>>>>>> what > > >>>>>>>>>>> > > >>>>>>>>>>> was > > >>>>>>>>>>> > > >>>>>>>>>>> in > > >>>>>>>>>>> > > >>>>>>>>>>> the javadoc. So to restate my proposal without the typo, > using > > >>>>>>>>>>> > > >>>>>>>>>>> just > > >>>>>>>>>>> > > >>>>>>>>>>> the > > >>>>>>>>>>> > > >>>>>>>>>>> existing classes (that naming is a separate question): > > >>>>>>>>>>> > > >>>>>>>>>>> long position(TopicPartition tp) > > >>>>>>>>>>> > > >>>>>>>>>>> void seek(TopicPartitionOffset p) > > >>>>>>>>>>> > > >>>>>>>>>>> long committed(TopicPartition tp) > > >>>>>>>>>>> > > >>>>>>>>>>> void commit(TopicPartitionOffset...); > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> So I may be unclear on committed() (AKA lastCommittedOffset). > > Is > > >>>>>>>>>>> > > >>>>>>>>>>> it returning the in-memory value from the last commit by this > > >>>>>>>>>>> > > >>>>>>>>>>> consumer, > > >>>>>>>>>>> > > >>>>>>>>>>> or > > >>>>>>>>>>> > > >>>>>>>>>>> is > > >>>>>>>>>>> > > >>>>>>>>>>> it doing a remote fetch, or both? I think you are saying > both, > > >> i.e. > > >>>>>>>>>>> > > >>>>>>>>>>> if > > >>>>>>>>>>> > > >>>>>>>>>>> you > > >>>>>>>>>>> > > >>>>>>>>>>> have committed on a partition it returns you that value but > if > > >>>>>>>>>>> > > >>>>>>>>>>> you > > >>>>>>>>>>> > > >>>>>>>>>>> haven't > > >>>>>>>>>>> > > >>>>>>>>>>> it does a remote lookup? > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> The other argument for making committed batched is that > > commit() > > >>>>>>>>>>> > > >>>>>>>>>>> is batched, so there is symmetry. > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> position() and seek() are always in memory changes (I assume) > > so > > >>>>>>>>>>> > > >>>>>>>>>>> there > > >>>>>>>>>>> > > >>>>>>>>>>> is > > >>>>>>>>>>> > > >>>>>>>>>>> no need to batch them. > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> So taking all that into account what if we revise it to > > >>>>>>>>>>> > > >>>>>>>>>>> long position(TopicPartition tp) > > >>>>>>>>>>> > > >>>>>>>>>>> void seek(TopicPartitionOffset p) > > >>>>>>>>>>> > > >>>>>>>>>>> Map<TopicPartition, Long> committed(TopicPartition tp); > > >>>>>>>>>>> > > >>>>>>>>>>> void commit(TopicPartitionOffset...); > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> This is not symmetric between position/seek and > > commit/committed > > >>>>>>>>>>> > > >>>>>>>>>>> but > > >>>>>>>>>>> > > >>>>>>>>>>> it > > >>>>>>>>>>> > > >>>>>>>>>>> is > > >>>>>>>>>>> > > >>>>>>>>>>> convenient. Another option for naming would be > > >>>>>>>>>>> > > >>>>>>>>>>> position/reposition > > >>>>>>>>>>> > > >>>>>>>>>>> instead > > >>>>>>>>>>> > > >>>>>>>>>>> of position/seek. > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> With respect to the name TopicPartitionOffset, what I was > > trying > > >>>>>>>>>>> > > >>>>>>>>>>> to > > >>>>>>>>>>> > > >>>>>>>>>>> say > > >>>>>>>>>>> > > >>>>>>>>>>> is > > >>>>>>>>>>> > > >>>>>>>>>>> that I recommend we change that to something shorter. I think > > >>>>>>>>>>> > > >>>>>>>>>>> TopicPosition > > >>>>>>>>>>> > > >>>>>>>>>>> or ConsumerPosition might be better. Position does not refer > to > > >>>>>>>>>>> > > >>>>>>>>>>> the variables in the object, it refers to the meaning of the > > >>>>>>>>>>> > > >>>>>>>>>>> object--it represents a position within a topic. The offset > > >>>>>>>>>>> > > >>>>>>>>>>> field in that object > > >>>>>>>>>>> > > >>>>>>>>>>> is > > >>>>>>>>>>> > > >>>>>>>>>>> still called the offset. TopicOffset, PartitionOffset, or > > >>>>>>>>>>> > > >>>>>>>>>>> ConsumerOffset > > >>>>>>>>>>> > > >>>>>>>>>>> would all be workable too. Basically I am just objecting to > > >>>>>>>>>>> > > >>>>>>>>>>> concatenating > > >>>>>>>>>>> > > >>>>>>>>>>> three nouns together. :-) > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> -Jay > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> On Thu, Feb 13, 2014 at 1:54 PM, Neha Narkhede < > > >>>>>>>>>>> > > >>>>>>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com > > ><mailto: > > >>>>>>>>>>> neha.narkh...@gmail.com><mailto: > > >>>>>>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com > > >><mailto: > > >>>>>>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com > > ><mailto: > > >>>>>>>>>>> neha.narkh...@gmail.com>> > > >>>>>>>>>>> > > >>>>>>>>>>> wrote: > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> 2. It returns a list of results. But how can you use the > list? > > >>>>>>>>>>> > > >>>>>>>>>>> The > > >>>>>>>>>>> > > >>>>>>>>>>> only > > >>>>>>>>>>> > > >>>>>>>>>>> way > > >>>>>>>>>>> > > >>>>>>>>>>> to use the list is to make a map of tp=>offset and then look > > >>>>>>>>>>> > > >>>>>>>>>>> up > > >>>>>>>>>>> > > >>>>>>>>>>> results > > >>>>>>>>>>> > > >>>>>>>>>>> in > > >>>>>>>>>>> > > >>>>>>>>>>> this map (or do a for loop over the list for the partition > you > > >>>>>>>>>>> > > >>>>>>>>>>> want). I > > >>>>>>>>>>> > > >>>>>>>>>>> recommend that if this is an in-memory check we just do one > at > > >>>>>>>>>>> > > >>>>>>>>>>> a > > >>>>>>>>>>> > > >>>>>>>>>>> time. > > >>>>>>>>>>> > > >>>>>>>>>>> E.g. > > >>>>>>>>>>> > > >>>>>>>>>>> long committedPosition( > > >>>>>>>>>>> > > >>>>>>>>>>> TopicPosition). > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> This was discussed in the previous emails. There is a choic > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> -- > > >>>>>>>>>>> Robert Withers > > >>>>>>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com > > ><mailto: > > >>>>>>>>>>> robert.with...@dish.com><mailto: > > >>>>>>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com>> > > >>>>>>>>>>> c: 303.919.5856 > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> -- > > >>>>>>>>>>> Robert Withers > > >>>>>>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com > > ><mailto: > > >>>>>>>>>>> robert.with...@dish.com> > > >>>>>>>>>>> c: 303.919.5856 > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> -- > > >>>>>>>>>>> -- Guozhang > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> -- > > >>>>>>>>>>> Robert Withers > > >>>>>>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com> > > >>>>>>>>>>> c: 303.919.5856 > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>>> > > >>>> > > >>>> > > >> > > >> > > >> > > > > > > >