Few clarifications:

1. "The new
consumer API is non blocking and instead of returning a blocking iterator,
the consumer provides a poll() API that returns a list of records. "

So this means the consumer polls, and if there are new messages it pulls
them down and then disconnects?

2.
" The consumer also allows long poll
to reduce the end-to-end message latency for low throughput data."

How is this different than blocking?  Is it even based meaning it keeps a
long poll conneciton open, and if/when a new message arrives it triggers an
event on the consumer side?


3.
" The consumer batches
data and multiplexes I/O over TCP connections to each of the brokers it
communicates with, for high throughput. "

If it is single threaded, does each tcp brocker connection block?  Not sure
I understand how this works if it is single threaded.



On Thu, Feb 27, 2014 at 11:38 PM, Robert Withers <robert.w.with...@gmail.com
> wrote:

> Thank you, Neha, that makes it clear.  Really, the aspect of all this that
> we could really use is a way to do exactly once processing.  We are looking
> at more critical data.  What are the latest thoughts on how to achieve
> exactly once and how might that affect a consumer API?
>
> Thanks,
> Rob
>
> On Feb 27, 2014, at 10:29 AM, Neha Narkhede <neha.narkh...@gmail.com>
> wrote:
>
> > Is this<
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html#seek%28kafka.common.TopicPartitionOffset...%29
> >what
> > you are looking for? Basically, I think from the overall feedback, it
> > looks like code snippets don't seem to work for overall understanding of
> > the APIs. I plan to update the javadoc with more complete examples that
> > have been discussed so far on this thread and generally on the mailing
> list.
> >
> > Thanks,
> > Neha
> >
> >
> >
> >
> > On Thu, Feb 27, 2014 at 4:17 AM, Robert Withers
> > <robert.w.with...@gmail.com>wrote:
> >
> >> Neha,
> >>
> >> I see how one might wish to implement onPartitionsAssigned and
> >> onPartitionsRevoked, but I don't have a sense for how I might supply
> these
> >> implementations to a running consumer.  What would the setup code look
> like
> >> to start a high-level consumer with these provided implementations?
> >>
> >> thanks,
> >> Rob
> >>
> >>
> >> On Feb 27, 2014, at 3:48 AM, Neha Narkhede <neha.narkh...@gmail.com>
> >> wrote:
> >>
> >>> Rob,
> >>>
> >>> The use of the callbacks is explained in the javadoc here -
> >>>
> >>
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerRebalanceCallback.html
> >>>
> >>> Let me know if it makes sense. The hope is to improve the javadoc so
> that
> >>> it is self explanatory.
> >>>
> >>> Thanks,
> >>> Neha
> >>>
> >>>
> >>> On Wed, Feb 26, 2014 at 9:16 AM, Robert Withers
> >>> <robert.w.with...@gmail.com>wrote:
> >>>
> >>>> Neha, what does the use of the RebalanceBeginCallback and
> >>>> RebalanceEndCallback look like?
> >>>>
> >>>> thanks,
> >>>> Rob
> >>>>
> >>>> On Feb 25, 2014, at 3:51 PM, Neha Narkhede <neha.narkh...@gmail.com>
> >>>> wrote:
> >>>>
> >>>>> How do you know n? The whole point is that you need to be able to
> fetch
> >>>> the
> >>>>> end offset. You can't a priori decide you will load 1m messages
> without
> >>>>> knowing what is there.
> >>>>>
> >>>>> Hmm. I think what you are pointing out is that in the new consumer
> API,
> >>>> we
> >>>>> don't have a way to issue the equivalent of the existing
> >>>> getOffsetsBefore()
> >>>>> API. Agree that is a flaw that we should fix.
> >>>>>
> >>>>> Will update the docs/wiki with a few use cases that I've collected so
> >> far
> >>>>> and see if the API covers those.
> >>>>>
> >>>>> I would prefer PartitionsAssigned and PartitionsRevoked as that seems
> >>>>> clearer to me
> >>>>>
> >>>>> Well the RebalanceBeginCallback interface will have
> >>>> onPartitionsAssigned()
> >>>>> as the callback. Similarly, the RebalanceEndCallback interface will
> >> have
> >>>>> onPartitionsRevoked() as the callback. Makes sense?
> >>>>>
> >>>>> Thanks,
> >>>>> Neha
> >>>>>
> >>>>>
> >>>>> On Tue, Feb 25, 2014 at 2:38 PM, Jay Kreps <jay.kr...@gmail.com>
> >> wrote:
> >>>>>
> >>>>>> 1. I would prefer PartitionsAssigned and PartitionsRevoked as that
> >> seems
> >>>>>> clearer to me.
> >>>>>>
> >>>>>> -Jay
> >>>>>>
> >>>>>>
> >>>>>> On Tue, Feb 25, 2014 at 10:19 AM, Neha Narkhede <
> >>>> neha.narkh...@gmail.com
> >>>>>>> wrote:
> >>>>>>
> >>>>>>> Thanks for the reviews so far! There are a few outstanding
> questions
> >> -
> >>>>>>>
> >>>>>>> 1.  It will be good to make the rebalance callbacks forward
> >> compatible
> >>>>>> with
> >>>>>>> Java 8 capabilities. We can change it to PartitionsAssignedCallback
> >>>>>>> and PartitionsRevokedCallback or RebalanceBeginCallback and
> >>>>>>> RebalanceEndCallback?
> >>>>>>>
> >>>>>>> If there are no objections, I will change it to
> >> RebalanceBeginCallback
> >>>>>> and
> >>>>>>> RebalanceEndCallback.
> >>>>>>>
> >>>>>>> 2.  The return type for committed() is List<TopicPartitionOffset>.
> >>>> There
> >>>>>>> was a suggestion to change it to either be Map<TopicPartition,Long>
> >> or
> >>>>>>> Map<TopicPartition, TopicPartitionOffset>
> >>>>>>>
> >>>>>>> Do people have feedback on this suggestion?
> >>>>>>>
> >>>>>>>
> >>>>>>> On Tue, Feb 25, 2014 at 9:56 AM, Neha Narkhede <
> >>>> neha.narkh...@gmail.com
> >>>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Robert,
> >>>>>>>>
> >>>>>>>> Are you saying it is possible to get events from the high-level
> >>>>>>> consumerregarding various state machine changes?  For instance, can
> >> we
> >>>>>> get a
> >>>>>>>> notification when a rebalance starts and ends, when a partition is
> >>>>>>>> assigned/unassigned, when an offset is committed on a partition,
> >> when
> >>>> a
> >>>>>>>> leader changes and so on?  I call this OOB traffic, since they are
> >> not
> >>>>>>> the
> >>>>>>>> core messages streaming, but side-band events, yet they are still
> >>>>>>>> potentially useful to consumers.
> >>>>>>>>
> >>>>>>>> In the current proposal, you get notified when the state machine
> >>>>>> changes
> >>>>>>>> i.e. before and after a rebalance is triggered. Look at
> >>>>>>>> ConsumerRebalanceCallback<
> >>>>>>>
> >>>>>>
> >>>>
> >>
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerRebalanceCallback.html
> >>>>>>>>
> >>>>>>>> .Leader changes do not count as state machine changes for consumer
> >>>>>>>> rebalance purposes.
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>> Neha
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Tue, Feb 25, 2014 at 9:54 AM, Neha Narkhede <
> >>>>>> neha.narkh...@gmail.com
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Jay/Robert -
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> I think what Robert is saying is that we need to think through
> the
> >>>>>>> offset
> >>>>>>>>> API to enable "batch processing" of topic data. Think of a
> process
> >>>>>> that
> >>>>>>>>> periodically kicks off to compute a data summary or do a data
> load
> >> or
> >>>>>>>>> something like that. I think what we need to support this is an
> api
> >>>> to
> >>>>>>>>> fetch the last offset from the server for a partition. Something
> >> like
> >>>>>>>>> long lastOffset(TopicPartition tp)
> >>>>>>>>> and for symmetry
> >>>>>>>>> long firstOffset(TopicPartition tp)
> >>>>>>>>>
> >>>>>>>>> Likely this would have to be batched.
> >>>>>>>>>
> >>>>>>>>> A fixed range of data load can be done using the existing APIs as
> >>>>>>>>> follows. This assumes you know the endOffset which can be
> >>>>>> currentOffset
> >>>>>>> + n
> >>>>>>>>> (number of messages in the load)
> >>>>>>>>>
> >>>>>>>>> long startOffset = consumer.position(partition);
> >>>>>>>>> long endOffset = startOffset + n;
> >>>>>>>>> while(consumer.position(partition) <= endOffset) {
> >>>>>>>>>   List<ConsumerRecord> messages = consumer.poll(timeout,
> >>>>>>>>> TimeUnit.MILLISECONDS);
> >>>>>>>>>   process(messages, endOffset);          // processes messages
> >>>>>> until
> >>>>>>>>> endOffset
> >>>>>>>>> }
> >>>>>>>>>
> >>>>>>>>> Does that make sense?
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Tue, Feb 25, 2014 at 9:49 AM, Neha Narkhede <
> >>>>>> neha.narkh...@gmail.com
> >>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Thanks for the review, Jun. Here are some comments -
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> 1. The using of ellipsis: This may make passing a list of items
> >> from
> >>>>>> a
> >>>>>>>>>> collection to the api a bit harder. Suppose that you have a list
> >> of
> >>>>>>>>>> topics
> >>>>>>>>>> stored in
> >>>>>>>>>>
> >>>>>>>>>> ArrayList<String> topics;
> >>>>>>>>>>
> >>>>>>>>>> If you want subscribe to all topics in one call, you will have
> to
> >>>> do:
> >>>>>>>>>>
> >>>>>>>>>> String[] topicArray = new String[topics.size()];
> >>>>>>>>>> consumer.subscribe(topics.
> >>>>>>>>>> toArray(topicArray));
> >>>>>>>>>>
> >>>>>>>>>> A similar argument can be made for arguably the more common use
> >> case
> >>>>>> of
> >>>>>>>>>> subscribing to a single topic as well. In these cases, user is
> >>>>>> required
> >>>>>>>>>> to write more
> >>>>>>>>>> code to create a single item collection and pass it in. Since
> >>>>>>>>>> subscription is extremely lightweight
> >>>>>>>>>> invoking it multiple times also seems like a workable solution,
> >> no?
> >>>>>>>>>>
> >>>>>>>>>> 2. It would be good to document that the following apis are
> >> mutually
> >>>>>>>>>> exclusive. Also, if the partition level subscription is
> specified,
> >>>>>>> there
> >>>>>>>>>> is
> >>>>>>>>>> no group management. Finally, unsubscribe() can only be used to
> >>>>>> cancel
> >>>>>>>>>> subscriptions with the same pattern. For example, you can't
> >>>>>> unsubscribe
> >>>>>>>>>> at
> >>>>>>>>>> the partition level if the subscription is done at the topic
> >> level.
> >>>>>>>>>>
> >>>>>>>>>> *subscribe*(java.lang.String... topics)
> >>>>>>>>>> *subscribe*(java.lang.String topic, int... partitions)
> >>>>>>>>>>
> >>>>>>>>>> Makes sense. Made the suggested improvements to the docs<
> >>>>>>>
> >>>>>>
> >>>>
> >>
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/Consumer.html#subscribe%28java.lang.String...%29
> >>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> 3.commit(): The following comment in the doc should probably say
> >>>>>>> "commit
> >>>>>>>>>> offsets for partitions assigned to this consumer".
> >>>>>>>>>>
> >>>>>>>>>> If no partitions are specified, commits offsets for the
> subscribed
> >>>>>>> list
> >>>>>>>>>> of
> >>>>>>>>>> topics and partitions to Kafka.
> >>>>>>>>>>
> >>>>>>>>>> Could you give more context on this suggestion? Here is the
> entire
> >>>>>> doc
> >>>>>>> -
> >>>>>>>>>>
> >>>>>>>>>> Synchronously commits the specified offsets for the specified
> list
> >>>> of
> >>>>>>>>>> topics and partitions to *Kafka*. If no partitions are
> specified,
> >>>>>>>>>> commits offsets for the subscribed list of topics and
> partitions.
> >>>>>>>>>>
> >>>>>>>>>> The hope is to convey that if no partitions are specified,
> offsets
> >>>>>> will
> >>>>>>>>>> be committed for the subscribed list of partitions. One
> >> improvement
> >>>>>>> could
> >>>>>>>>>> be to
> >>>>>>>>>> explicitly state that the offsets returned on the last poll will
> >> be
> >>>>>>>>>> committed. I updated this to -
> >>>>>>>>>>
> >>>>>>>>>> Synchronously commits the specified offsets for the specified
> list
> >>>> of
> >>>>>>>>>> topics and partitions to *Kafka*. If no offsets are specified,
> >>>>>> commits
> >>>>>>>>>> offsets returned on the last {@link #poll(long, TimeUnit)
> poll()}
> >>>> for
> >>>>>>>>>> the subscribed list of topics and partitions.
> >>>>>>>>>>
> >>>>>>>>>> 4. There is inconsistency in specifying partitions. Sometimes we
> >> use
> >>>>>>>>>> TopicPartition and some other times we use String and int (see
> >>>>>>>>>> examples below).
> >>>>>>>>>>
> >>>>>>>>>> void onPartitionsAssigned(Consumer consumer,
> >>>>>>>>>> TopicPartition...partitions)
> >>>>>>>>>>
> >>>>>>>>>> public void *subscribe*(java.lang.String topic, int...
> partitions)
> >>>>>>>>>>
> >>>>>>>>>> Yes, this was discussed previously. I think generally the
> >> consensus
> >>>>>>>>>> seems to be to use the higher level
> >>>>>>>>>> classes everywhere. Made those changes.
> >>>>>>>>>>
> >>>>>>>>>> What's the use case of position()? Isn't that just the
> >> nextOffset()
> >>>>>> on
> >>>>>>>>>> the
> >>>>>>>>>> last message returned from poll()?
> >>>>>>>>>>
> >>>>>>>>>> Yes, except in the case where a rebalance is triggered and
> poll()
> >> is
> >>>>>>> not
> >>>>>>>>>> yet invoked. Here, you would use position() to get the new fetch
> >>>>>>> position
> >>>>>>>>>> for the specific partition. Even if this is not a common use
> case,
> >>>>>> IMO
> >>>>>>> it
> >>>>>>>>>> is much easier to use position() to get the fetch offset than
> >>>>>> invoking
> >>>>>>>>>> nextOffset() on the last message. This also keeps the APIs
> >>>> symmetric,
> >>>>>>> which
> >>>>>>>>>> is nice.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Mon, Feb 24, 2014 at 7:06 PM, Withers, Robert <
> >>>>>>>>>> robert.with...@dish.com> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> That's wonderful.  Thanks for kafka.
> >>>>>>>>>>>
> >>>>>>>>>>> Rob
> >>>>>>>>>>>
> >>>>>>>>>>> On Feb 24, 2014, at 9:58 AM, Guozhang Wang <wangg...@gmail.com
> >>>>>>> <mailto:
> >>>>>>>>>>> wangg...@gmail.com>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> Hi Robert,
> >>>>>>>>>>>
> >>>>>>>>>>> Yes, you can check out the callback functions in the new API
> >>>>>>>>>>>
> >>>>>>>>>>> onPartitionDesigned
> >>>>>>>>>>> onPartitionAssigned
> >>>>>>>>>>>
> >>>>>>>>>>> and see if they meet your needs.
> >>>>>>>>>>>
> >>>>>>>>>>> Guozhang
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert <
> >>>>>>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com>>wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> Jun,
> >>>>>>>>>>>
> >>>>>>>>>>> Are you saying it is possible to get events from the high-level
> >>>>>>> consumer
> >>>>>>>>>>> regarding various state machine changes?  For instance, can we
> >> get
> >>>> a
> >>>>>>>>>>> notification when a rebalance starts and ends, when a partition
> >> is
> >>>>>>>>>>> assigned/unassigned, when an offset is committed on a
> partition,
> >>>>>> when
> >>>>>>> a
> >>>>>>>>>>> leader changes and so on?  I call this OOB traffic, since they
> >> are
> >>>>>> not
> >>>>>>>>>>> the
> >>>>>>>>>>> core messages streaming, but side-band events, yet they are
> still
> >>>>>>>>>>> potentially useful to consumers.
> >>>>>>>>>>>
> >>>>>>>>>>> Thank you,
> >>>>>>>>>>> Robert
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Robert Withers
> >>>>>>>>>>> Staff Analyst/Developer
> >>>>>>>>>>> o: (720) 514-8963
> >>>>>>>>>>> c:  (571) 262-1873
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> -----Original Message-----
> >>>>>>>>>>> From: Jun Rao [mailto:jun...@gmail.com]
> >>>>>>>>>>> Sent: Sunday, February 23, 2014 4:19 PM
> >>>>>>>>>>> To: users@kafka.apache.org<mailto:users@kafka.apache.org>
> >>>>>>>>>>> Subject: Re: New Consumer API discussion
> >>>>>>>>>>>
> >>>>>>>>>>> Robert,
> >>>>>>>>>>>
> >>>>>>>>>>> For the push orient api, you can potentially implement your own
> >>>>>>>>>>> MessageHandler with those methods. In the main loop of our new
> >>>>>>> consumer
> >>>>>>>>>>> api, you can just call those methods based on the events you
> get.
> >>>>>>>>>>>
> >>>>>>>>>>> Also, we already have an api to get the first and the last
> offset
> >>>>>> of a
> >>>>>>>>>>> partition (getOffsetBefore).
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks,
> >>>>>>>>>>>
> >>>>>>>>>>> Jun
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert
> >>>>>>>>>>> <robert.with...@dish.com<mailto:robert.with...@dish.com
> >>wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> This is a good idea, too.  I would modify it to include stream
> >>>>>>>>>>> marking, then you can have:
> >>>>>>>>>>>
> >>>>>>>>>>> long end = consumer.lastOffset(tp);
> >>>>>>>>>>> consumer.setMark(end);
> >>>>>>>>>>> while(consumer.beforeMark()) {
> >>>>>>>>>>> process(consumer.pollToMark());
> >>>>>>>>>>> }
> >>>>>>>>>>>
> >>>>>>>>>>> or
> >>>>>>>>>>>
> >>>>>>>>>>> long end = consumer.lastOffset(tp);
> >>>>>>>>>>> consumer.setMark(end);
> >>>>>>>>>>> for(Object msg : consumer.iteratorToMark()) {
> >>>>>>>>>>> process(msg);
> >>>>>>>>>>> }
> >>>>>>>>>>>
> >>>>>>>>>>> I actually have 4 suggestions, then:
> >>>>>>>>>>>
> >>>>>>>>>>> *   pull: stream marking
> >>>>>>>>>>> *   pull: finite streams, bound by time range (up-to-now,
> >>>> yesterday)
> >>>>>>> or
> >>>>>>>>>>> offset
> >>>>>>>>>>> *   pull: async api
> >>>>>>>>>>> *   push: KafkaMessageSource, for a push model, with msg and
> OOB
> >>>>>>> events.
> >>>>>>>>>>> Build one in either individual or chunk mode and have a
> listener
> >>>> for
> >>>>>>>>>>> each msg or a listener for a chunk of msgs.  Make it composable
> >> and
> >>>>>>>>>>> policy driven (chunked, range, commitOffsets policy, retry
> >> policy,
> >>>>>>>>>>> transactional)
> >>>>>>>>>>>
> >>>>>>>>>>> Thank you,
> >>>>>>>>>>> Robert
> >>>>>>>>>>>
> >>>>>>>>>>> On Feb 22, 2014, at 11:21 AM, Jay Kreps <jay.kr...@gmail.com
> >>>>>> <mailto:
> >>>>>>>>>>> jay.kr...@gmail.com><mailto:
> >>>>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> I think what Robert is saying is that we need to think through
> >> the
> >>>>>>>>>>> offset API to enable "batch processing" of topic data. Think
> of a
> >>>>>>>>>>> process that periodically kicks off to compute a data summary
> or
> >> do
> >>>>>> a
> >>>>>>>>>>> data load or something like that. I think what we need to
> support
> >>>>>> this
> >>>>>>>>>>> is an api to fetch the last offset from the server for a
> >> partition.
> >>>>>>>>>>> Something like
> >>>>>>>>>>> long lastOffset(TopicPartition tp)
> >>>>>>>>>>> and for symmetry
> >>>>>>>>>>> long firstOffset(TopicPartition tp)
> >>>>>>>>>>>
> >>>>>>>>>>> Likely this would have to be batched. Essentially we should add
> >>>> this
> >>>>>>>>>>> use case to our set of code examples to write and think
> through.
> >>>>>>>>>>>
> >>>>>>>>>>> The usage would be something like
> >>>>>>>>>>>
> >>>>>>>>>>> long end = consumer.lastOffset(tp);
> >>>>>>>>>>> while(consumer.position < end)
> >>>>>>>>>>> process(consumer.poll());
> >>>>>>>>>>>
> >>>>>>>>>>> -Jay
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert
> >>>>>>>>>>> <robert.with...@dish.com<mailto:robert.with...@dish.com>
> >>>>>>>>>>> <mailto:robert.with...@dish.com>>wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> Jun,
> >>>>>>>>>>>
> >>>>>>>>>>> I was originally thinking a non-blocking read from a
> distributed
> >>>>>>>>>>> stream should distinguish between "no local messages, but a
> fetch
> >>>> is
> >>>>>>>>>>> occurring"
> >>>>>>>>>>> versus "you have drained the stream".  The reason this may be
> >>>>>> valuable
> >>>>>>>>>>> to me is so I can write consumers that read all known traffic
> >> then
> >>>>>>>>>>> terminate.
> >>>>>>>>>>> You caused me to reconsider and I think I am conflating 2
> things.
> >>>>>> One
> >>>>>>>>>>> is a sync/async api while the other is whether to have an
> >> infinite
> >>>>>> or
> >>>>>>>>>>> finite stream.  Is it possible to build a finite KafkaStream
> on a
> >>>>>>>>>>> range of messages?
> >>>>>>>>>>>
> >>>>>>>>>>> Perhaps a Simple Consumer would do just fine and then I could
> >> start
> >>>>>>>>>>> off getting the writeOffset from zookeeper and tell it to read
> a
> >>>>>>>>>>> specified range per partition.  I've done this and forked a
> >> simple
> >>>>>>>>>>> consumer runnable for each partition, for one of our analyzers.
> >>>> The
> >>>>>>>>>>> great thing about the high-level consumer is that rebalance,
> so I
> >>>>>> can
> >>>>>>>>>>> fork however many stream readers I want and you just figure it
> >> out
> >>>>>> for
> >>>>>>>>>>> me.  In that way you offer us the control over the resource
> >>>>>>>>>>> consumption within a pull model.  This is best to regulate
> >> message
> >>>>>>>>>>> pressure, they say.
> >>>>>>>>>>>
> >>>>>>>>>>> Combining that high-level rebalance ability with a ranged
> >> partition
> >>>>>>>>>>> drain could be really nice...build the stream with an ending
> >>>>>> position
> >>>>>>>>>>> and it is a finite stream, but retain the high-level rebalance.
> >>>>>> With
> >>>>>>>>>>> a finite stream, you would know the difference of the 2 async
> >>>>>>>>>>> scenarios: fetch-in-progress versus end-of-stream.  With an
> >>>> infinite
> >>>>>>>>>>> stream, you never get end-of-stream.
> >>>>>>>>>>>
> >>>>>>>>>>> Aside from a high-level consumer over a finite range within
> each
> >>>>>>>>>>> partition, the other feature I can think of is more
> complicated.
> >> A
> >>>>>>>>>>> high-level consumer has state machine changes that the client
> >>>> cannot
> >>>>>>>>>>> access, to my knowledge.  Our use of kafka has us invoke a
> >> message
> >>>>>>>>>>> handler with each message we consumer from the KafkaStream, so
> we
> >>>>>>>>>>> convert a pull-model to a push-model.  Including the idea of
> >>>>>> receiving
> >>>>>>>>>>> notifications from state machine changes, what would be really
> >> nice
> >>>>>> is
> >>>>>>>>>>> to have a KafkaMessageSource, that is an eventful push model.
>  If
> >>>> it
> >>>>>>>>>>> were thread-safe, then we could register listeners for various
> >>>>>> events:
> >>>>>>>>>>>
> >>>>>>>>>>> *   opening-stream
> >>>>>>>>>>> *   closing-stream
> >>>>>>>>>>> *   message-arrived
> >>>>>>>>>>> *   end-of-stream/no-more-messages-in-partition (for finite
> >>>> streams)
> >>>>>>>>>>> *   rebalance started
> >>>>>>>>>>> *   partition assigned
> >>>>>>>>>>> *   partition unassigned
> >>>>>>>>>>> *   rebalance finished
> >>>>>>>>>>> *   partition-offset-committed
> >>>>>>>>>>>
> >>>>>>>>>>> Perhaps that is just our use, but instead of a pull-oriented
> >>>>>>>>>>> KafkaStream, is there any sense in your providing a
> push-oriented
> >>>>>>>>>>> KafkaMessageSource publishing OOB messages?
> >>>>>>>>>>>
> >>>>>>>>>>> thank you,
> >>>>>>>>>>> Robert
> >>>>>>>>>>>
> >>>>>>>>>>> On Feb 21, 2014, at 5:59 PM, Jun Rao <jun...@gmail.com<mailto:
> >>>>>>>>>>> jun...@gmail.com><mailto:
> >>>>>>>>>>> jun...@gmail.com<mailto:jun...@gmail.com>><mailto:
> >>>>>>>>>>> jun...@gmail.com<mailto:jun...@gmail.com><mailto:
> >> jun...@gmail.com
> >>>>>>>>>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> Robert,
> >>>>>>>>>>>
> >>>>>>>>>>> Could you explain why you want to distinguish btw
> >>>>>>>>>>> FetchingInProgressException and NoMessagePendingException? The
> >>>>>>>>>>> nextMsgs() method that you want is exactly what poll() does.
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks,
> >>>>>>>>>>>
> >>>>>>>>>>> Jun
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Wed, Feb 19, 2014 at 8:45 AM, Withers, Robert
> >>>>>>>>>>> <robert.with...@dish.com<mailto:robert.with...@dish.com>
> >> <mailto:
> >>>>>>>>>>> robert.with...@dish.com>
> >>>>>>>>>>> <mailto:robert.with...@dish.com>>wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> I am not clear on why the consumer stream should be
> positionable,
> >>>>>>>>>>> especially if it is limited to the in-memory fetched messages.
> >>>>>> Could
> >>>>>>>>>>> someone explain to me, please?  I really like the idea of
> >>>> committing
> >>>>>>>>>>> the offset specifically on those partitions with changed read
> >>>>>> offsets,
> >>>>>>>>>>> only.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> 2 items I would like to see added to the KafkaStream are:
> >>>>>>>>>>>
> >>>>>>>>>>> *         a non-blocking next(), throws several exceptions
> >>>>>>>>>>> (FetchingInProgressException and a NoMessagePendingException or
> >>>>>>>>>>> something) to differentiate between fetching or no messages
> left.
> >>>>>>>>>>>
> >>>>>>>>>>> *         A nextMsgs() method which returns all locally
> available
> >>>>>>>>>>> messages
> >>>>>>>>>>> and kicks off a fetch for the next chunk.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> If you are trying to add transactional features, then formally
> >>>>>> define
> >>>>>>>>>>> a DTP capability and pull in other server frameworks to share
> the
> >>>>>>>>>>> implementation.  Should it be XA/Open?  How about a new
> peer2peer
> >>>>>> DTP
> >>>>>>>>>>> protocol?
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Thank you,
> >>>>>>>>>>>
> >>>>>>>>>>> Robert
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Robert Withers
> >>>>>>>>>>>
> >>>>>>>>>>> Staff Analyst/Developer
> >>>>>>>>>>>
> >>>>>>>>>>> o: (720) 514-8963
> >>>>>>>>>>>
> >>>>>>>>>>> c:  (571) 262-1873
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> -----Original Message-----
> >>>>>>>>>>> From: Jay Kreps [mailto:jay.kr...@gmail.com]
> >>>>>>>>>>> Sent: Sunday, February 16, 2014 10:13 AM
> >>>>>>>>>>> To: users@kafka.apache.org<mailto:users@kafka.apache.org
> >>> <mailto:
> >>>>>>>>>>> users@kafka.apache.org><mailto:
> >>>>>>>>>>> users@kafka.apache.org<mailto:users@kafka.apache.org>>
> >>>>>>>>>>> Subject: Re: New Consumer API discussion
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> +1 I think those are good. It is a little weird that changing
> the
> >>>>>>>>>>> +fetch
> >>>>>>>>>>>
> >>>>>>>>>>> point is not batched but changing the commit point is, but I
> >>>> suppose
> >>>>>>>>>>> there is no helping that.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> -Jay
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Sat, Feb 15, 2014 at 7:52 AM, Neha Narkhede
> >>>>>>>>>>> <neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>
> >> <mailto:
> >>>>>>>>>>> neha.narkh...@gmail.com>
> >>>>>>>>>>> <mailto:neha.narkh...@gmail.com>
> >>>>>>>>>>> <mailto:neha.narkh...@gmail.com>>wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Jay,
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> That makes sense. position/seek deal with changing the
> consumers
> >>>>>>>>>>>
> >>>>>>>>>>> in-memory data, so there is no remote rpc there. For some
> >> reason, I
> >>>>>>>>>>>
> >>>>>>>>>>> got committed and seek mixed up in my head at that time :)
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> So we still end up with
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> long position(TopicPartition tp)
> >>>>>>>>>>>
> >>>>>>>>>>> void seek(TopicPartitionOffset p)
> >>>>>>>>>>>
> >>>>>>>>>>> Map<TopicPartition, Long> committed(TopicPartition tp);
> >>>>>>>>>>>
> >>>>>>>>>>> void commit(TopicPartitionOffset...);
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks,
> >>>>>>>>>>>
> >>>>>>>>>>> Neha
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Friday, February 14, 2014, Jay Kreps <jay.kr...@gmail.com
> >>>>>> <mailto:
> >>>>>>>>>>> jay.kr...@gmail.com><mailto:
> >>>>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto:
> >>>>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:
> >>>>>>>>>>> jay.kr...@gmail.com>><mailto:
> >>>>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:
> >>>>>>>>>>> jay.kr...@gmail.com><mailto:jay.kreps@gmail
> >>>>>>>>>>> .com>>>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Oh, interesting. So I am assuming the following implementation:
> >>>>>>>>>>>
> >>>>>>>>>>> 1. We have an in-memory fetch position which controls the next
> >>>> fetch
> >>>>>>>>>>>
> >>>>>>>>>>> offset.
> >>>>>>>>>>>
> >>>>>>>>>>> 2. Changing this has no effect until you poll again at which
> >> point
> >>>>>>>>>>>
> >>>>>>>>>>> your fetch request will be from the newly specified offset 3.
> We
> >>>>>>>>>>>
> >>>>>>>>>>> then have an in-memory but also remotely stored committed
> offset.
> >>>>>>>>>>>
> >>>>>>>>>>> 4. Calling commit has the effect of saving the fetch position
> as
> >>>>>>>>>>>
> >>>>>>>>>>> both the in memory committed position and in the remote store
> 5.
> >>>>>>>>>>>
> >>>>>>>>>>> Auto-commit is the same as periodically calling commit on all
> >>>>>>>>>>>
> >>>>>>>>>>> positions.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> So batching on commit as well as getting the committed position
> >>>>>>>>>>>
> >>>>>>>>>>> makes sense, but batching the fetch position wouldn't, right? I
> >>>>>>>>>>>
> >>>>>>>>>>> think you are actually thinking of a different approach.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> -Jay
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Thu, Feb 13, 2014 at 10:40 PM, Neha Narkhede
> >>>>>>>>>>>
> >>>>>>>>>>> <neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com
> ><mailto:
> >>>>>>>>>>> neha.narkh...@gmail.com><mailto:
> >>>>>>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>>
> >>>>>>>>>>>
> >>>>>>>>>>> <javascript:;>
> >>>>>>>>>>>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> I think you are saying both, i.e. if you have committed on a
> >>>>>>>>>>>
> >>>>>>>>>>> partition it returns you that value but if you
> >>>>>>>>>>>
> >>>>>>>>>>> haven't
> >>>>>>>>>>>
> >>>>>>>>>>> it does a remote lookup?
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Correct.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> The other argument for making committed batched is that
> commit()
> >>>>>>>>>>>
> >>>>>>>>>>> is batched, so there is symmetry.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> position() and seek() are always in memory changes (I assume)
> so
> >>>>>>>>>>>
> >>>>>>>>>>> there
> >>>>>>>>>>>
> >>>>>>>>>>> is
> >>>>>>>>>>>
> >>>>>>>>>>> no need to batch them.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> I'm not as sure as you are about that assumption being true.
> >>>>>>>>>>>
> >>>>>>>>>>> Basically
> >>>>>>>>>>>
> >>>>>>>>>>> in
> >>>>>>>>>>>
> >>>>>>>>>>> my example above, the batching argument for committed() also
> >>>>>>>>>>>
> >>>>>>>>>>> applies to
> >>>>>>>>>>>
> >>>>>>>>>>> position() since one purpose of fetching a partition's offset
> is
> >>>>>>>>>>>
> >>>>>>>>>>> to use
> >>>>>>>>>>>
> >>>>>>>>>>> it
> >>>>>>>>>>>
> >>>>>>>>>>> to set the position of the consumer to that offset. Since that
> >>>>>>>>>>>
> >>>>>>>>>>> might
> >>>>>>>>>>>
> >>>>>>>>>>> lead
> >>>>>>>>>>>
> >>>>>>>>>>> to a remote OffsetRequest call, I think we probably would be
> >>>>>>>>>>>
> >>>>>>>>>>> better off batching it.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Another option for naming would be position/reposition instead
> of
> >>>>>>>>>>>
> >>>>>>>>>>> position/seek.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> I think position/seek is better since it aligns with Java file
> >>>> APIs.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> I also think your suggestion about ConsumerPosition makes
> sense.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks,
> >>>>>>>>>>>
> >>>>>>>>>>> Neha
> >>>>>>>>>>>
> >>>>>>>>>>> On Feb 13, 2014 9:22 PM, "Jay Kreps" <jay.kr...@gmail.com
> >> <mailto:
> >>>>>>>>>>> jay.kr...@gmail.com><mailto:
> >>>>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto:
> >>>>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:
> >>>>>>>>>>> jay.kr...@gmail.com>><mailto:
> >>>>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:
> >>>>>>>>>>> jay.kr...@gmail.com><mailto:jay.kreps@gmail
> >>>>>>>>>>> .com>>>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Hey Neha,
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> I actually wasn't proposing the name TopicOffsetPosition, that
> >>>>>>>>>>>
> >>>>>>>>>>> was
> >>>>>>>>>>>
> >>>>>>>>>>> just a
> >>>>>>>>>>>
> >>>>>>>>>>> typo. I meant TopicPartitionOffset, and I was just referencing
> >>>>>>>>>>>
> >>>>>>>>>>> what
> >>>>>>>>>>>
> >>>>>>>>>>> was
> >>>>>>>>>>>
> >>>>>>>>>>> in
> >>>>>>>>>>>
> >>>>>>>>>>> the javadoc. So to restate my proposal without the typo, using
> >>>>>>>>>>>
> >>>>>>>>>>> just
> >>>>>>>>>>>
> >>>>>>>>>>> the
> >>>>>>>>>>>
> >>>>>>>>>>> existing classes (that naming is a separate question):
> >>>>>>>>>>>
> >>>>>>>>>>> long position(TopicPartition tp)
> >>>>>>>>>>>
> >>>>>>>>>>> void seek(TopicPartitionOffset p)
> >>>>>>>>>>>
> >>>>>>>>>>> long committed(TopicPartition tp)
> >>>>>>>>>>>
> >>>>>>>>>>> void commit(TopicPartitionOffset...);
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> So I may be unclear on committed() (AKA lastCommittedOffset).
> Is
> >>>>>>>>>>>
> >>>>>>>>>>> it returning the in-memory value from the last commit by this
> >>>>>>>>>>>
> >>>>>>>>>>> consumer,
> >>>>>>>>>>>
> >>>>>>>>>>> or
> >>>>>>>>>>>
> >>>>>>>>>>> is
> >>>>>>>>>>>
> >>>>>>>>>>> it doing a remote fetch, or both? I think you are saying both,
> >> i.e.
> >>>>>>>>>>>
> >>>>>>>>>>> if
> >>>>>>>>>>>
> >>>>>>>>>>> you
> >>>>>>>>>>>
> >>>>>>>>>>> have committed on a partition it returns you that value but if
> >>>>>>>>>>>
> >>>>>>>>>>> you
> >>>>>>>>>>>
> >>>>>>>>>>> haven't
> >>>>>>>>>>>
> >>>>>>>>>>> it does a remote lookup?
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> The other argument for making committed batched is that
> commit()
> >>>>>>>>>>>
> >>>>>>>>>>> is batched, so there is symmetry.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> position() and seek() are always in memory changes (I assume)
> so
> >>>>>>>>>>>
> >>>>>>>>>>> there
> >>>>>>>>>>>
> >>>>>>>>>>> is
> >>>>>>>>>>>
> >>>>>>>>>>> no need to batch them.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> So taking all that into account what if we revise it to
> >>>>>>>>>>>
> >>>>>>>>>>> long position(TopicPartition tp)
> >>>>>>>>>>>
> >>>>>>>>>>> void seek(TopicPartitionOffset p)
> >>>>>>>>>>>
> >>>>>>>>>>> Map<TopicPartition, Long> committed(TopicPartition tp);
> >>>>>>>>>>>
> >>>>>>>>>>> void commit(TopicPartitionOffset...);
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> This is not symmetric between position/seek and
> commit/committed
> >>>>>>>>>>>
> >>>>>>>>>>> but
> >>>>>>>>>>>
> >>>>>>>>>>> it
> >>>>>>>>>>>
> >>>>>>>>>>> is
> >>>>>>>>>>>
> >>>>>>>>>>> convenient. Another option for naming would be
> >>>>>>>>>>>
> >>>>>>>>>>> position/reposition
> >>>>>>>>>>>
> >>>>>>>>>>> instead
> >>>>>>>>>>>
> >>>>>>>>>>> of position/seek.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> With respect to the name TopicPartitionOffset, what I was
> trying
> >>>>>>>>>>>
> >>>>>>>>>>> to
> >>>>>>>>>>>
> >>>>>>>>>>> say
> >>>>>>>>>>>
> >>>>>>>>>>> is
> >>>>>>>>>>>
> >>>>>>>>>>> that I recommend we change that to something shorter. I think
> >>>>>>>>>>>
> >>>>>>>>>>> TopicPosition
> >>>>>>>>>>>
> >>>>>>>>>>> or ConsumerPosition might be better. Position does not refer to
> >>>>>>>>>>>
> >>>>>>>>>>> the variables in the object, it refers to the meaning of the
> >>>>>>>>>>>
> >>>>>>>>>>> object--it represents a position within a topic. The offset
> >>>>>>>>>>>
> >>>>>>>>>>> field in that object
> >>>>>>>>>>>
> >>>>>>>>>>> is
> >>>>>>>>>>>
> >>>>>>>>>>> still called the offset. TopicOffset, PartitionOffset, or
> >>>>>>>>>>>
> >>>>>>>>>>> ConsumerOffset
> >>>>>>>>>>>
> >>>>>>>>>>> would all be workable too. Basically I am just objecting to
> >>>>>>>>>>>
> >>>>>>>>>>> concatenating
> >>>>>>>>>>>
> >>>>>>>>>>> three nouns together. :-)
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> -Jay
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Thu, Feb 13, 2014 at 1:54 PM, Neha Narkhede <
> >>>>>>>>>>>
> >>>>>>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com
> ><mailto:
> >>>>>>>>>>> neha.narkh...@gmail.com><mailto:
> >>>>>>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com
> >><mailto:
> >>>>>>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com
> ><mailto:
> >>>>>>>>>>> neha.narkh...@gmail.com>>
> >>>>>>>>>>>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> 2. It returns a list of results. But how can you use the list?
> >>>>>>>>>>>
> >>>>>>>>>>> The
> >>>>>>>>>>>
> >>>>>>>>>>> only
> >>>>>>>>>>>
> >>>>>>>>>>> way
> >>>>>>>>>>>
> >>>>>>>>>>> to use the list is to make a map of tp=>offset and then look
> >>>>>>>>>>>
> >>>>>>>>>>> up
> >>>>>>>>>>>
> >>>>>>>>>>> results
> >>>>>>>>>>>
> >>>>>>>>>>> in
> >>>>>>>>>>>
> >>>>>>>>>>> this map (or do a for loop over the list for the partition you
> >>>>>>>>>>>
> >>>>>>>>>>> want). I
> >>>>>>>>>>>
> >>>>>>>>>>> recommend that if this is an in-memory check we just do one at
> >>>>>>>>>>>
> >>>>>>>>>>> a
> >>>>>>>>>>>
> >>>>>>>>>>> time.
> >>>>>>>>>>>
> >>>>>>>>>>> E.g.
> >>>>>>>>>>>
> >>>>>>>>>>> long committedPosition(
> >>>>>>>>>>>
> >>>>>>>>>>> TopicPosition).
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> This was discussed in the previous emails. There is a choic
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> --
> >>>>>>>>>>> Robert Withers
> >>>>>>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com
> ><mailto:
> >>>>>>>>>>> robert.with...@dish.com><mailto:
> >>>>>>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com>>
> >>>>>>>>>>> c: 303.919.5856
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> --
> >>>>>>>>>>> Robert Withers
> >>>>>>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com
> ><mailto:
> >>>>>>>>>>> robert.with...@dish.com>
> >>>>>>>>>>> c: 303.919.5856
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> --
> >>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> --
> >>>>>>>>>>> Robert Withers
> >>>>>>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com>
> >>>>>>>>>>> c: 303.919.5856
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>
> >>>>
> >>
> >>
> >>
>
>
>

Reply via email to