Rob,

The use of the callbacks is explained in the javadoc here -
http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerRebalanceCallback.html

Let me know if it makes sense. The hope is to improve the javadoc so that
it is self explanatory.

Thanks,
Neha


On Wed, Feb 26, 2014 at 9:16 AM, Robert Withers
<robert.w.with...@gmail.com>wrote:

> Neha, what does the use of the RebalanceBeginCallback and
> RebalanceEndCallback look like?
>
> thanks,
> Rob
>
> On Feb 25, 2014, at 3:51 PM, Neha Narkhede <neha.narkh...@gmail.com>
> wrote:
>
> > How do you know n? The whole point is that you need to be able to fetch
> the
> > end offset. You can't a priori decide you will load 1m messages without
> > knowing what is there.
> >
> > Hmm. I think what you are pointing out is that in the new consumer API,
> we
> > don't have a way to issue the equivalent of the existing
> getOffsetsBefore()
> > API. Agree that is a flaw that we should fix.
> >
> > Will update the docs/wiki with a few use cases that I've collected so far
> > and see if the API covers those.
> >
> > I would prefer PartitionsAssigned and PartitionsRevoked as that seems
> > clearer to me
> >
> > Well the RebalanceBeginCallback interface will have
> onPartitionsAssigned()
> > as the callback. Similarly, the RebalanceEndCallback interface will have
> > onPartitionsRevoked() as the callback. Makes sense?
> >
> > Thanks,
> > Neha
> >
> >
> > On Tue, Feb 25, 2014 at 2:38 PM, Jay Kreps <jay.kr...@gmail.com> wrote:
> >
> >> 1. I would prefer PartitionsAssigned and PartitionsRevoked as that seems
> >> clearer to me.
> >>
> >> -Jay
> >>
> >>
> >> On Tue, Feb 25, 2014 at 10:19 AM, Neha Narkhede <
> neha.narkh...@gmail.com
> >>> wrote:
> >>
> >>> Thanks for the reviews so far! There are a few outstanding questions -
> >>>
> >>> 1.  It will be good to make the rebalance callbacks forward compatible
> >> with
> >>> Java 8 capabilities. We can change it to PartitionsAssignedCallback
> >>> and PartitionsRevokedCallback or RebalanceBeginCallback and
> >>> RebalanceEndCallback?
> >>>
> >>> If there are no objections, I will change it to RebalanceBeginCallback
> >> and
> >>> RebalanceEndCallback.
> >>>
> >>> 2.  The return type for committed() is List<TopicPartitionOffset>.
> There
> >>> was a suggestion to change it to either be Map<TopicPartition,Long> or
> >>> Map<TopicPartition, TopicPartitionOffset>
> >>>
> >>> Do people have feedback on this suggestion?
> >>>
> >>>
> >>> On Tue, Feb 25, 2014 at 9:56 AM, Neha Narkhede <
> neha.narkh...@gmail.com
> >>>> wrote:
> >>>
> >>>> Robert,
> >>>>
> >>>> Are you saying it is possible to get events from the high-level
> >>> consumerregarding various state machine changes?  For instance, can we
> >> get a
> >>>> notification when a rebalance starts and ends, when a partition is
> >>>> assigned/unassigned, when an offset is committed on a partition, when
> a
> >>>> leader changes and so on?  I call this OOB traffic, since they are not
> >>> the
> >>>> core messages streaming, but side-band events, yet they are still
> >>>> potentially useful to consumers.
> >>>>
> >>>> In the current proposal, you get notified when the state machine
> >> changes
> >>>> i.e. before and after a rebalance is triggered. Look at
> >>>> ConsumerRebalanceCallback<
> >>>
> >>
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerRebalanceCallback.html
> >>>>
> >>>> .Leader changes do not count as state machine changes for consumer
> >>>> rebalance purposes.
> >>>>
> >>>> Thanks,
> >>>> Neha
> >>>>
> >>>>
> >>>> On Tue, Feb 25, 2014 at 9:54 AM, Neha Narkhede <
> >> neha.narkh...@gmail.com
> >>>> wrote:
> >>>>
> >>>>> Jay/Robert -
> >>>>>
> >>>>>
> >>>>> I think what Robert is saying is that we need to think through the
> >>> offset
> >>>>> API to enable "batch processing" of topic data. Think of a process
> >> that
> >>>>> periodically kicks off to compute a data summary or do a data load or
> >>>>> something like that. I think what we need to support this is an api
> to
> >>>>> fetch the last offset from the server for a partition. Something like
> >>>>>   long lastOffset(TopicPartition tp)
> >>>>> and for symmetry
> >>>>>   long firstOffset(TopicPartition tp)
> >>>>>
> >>>>> Likely this would have to be batched.
> >>>>>
> >>>>> A fixed range of data load can be done using the existing APIs as
> >>>>> follows. This assumes you know the endOffset which can be
> >> currentOffset
> >>> + n
> >>>>> (number of messages in the load)
> >>>>>
> >>>>> long startOffset = consumer.position(partition);
> >>>>> long endOffset = startOffset + n;
> >>>>> while(consumer.position(partition) <= endOffset) {
> >>>>>     List<ConsumerRecord> messages = consumer.poll(timeout,
> >>>>> TimeUnit.MILLISECONDS);
> >>>>>     process(messages, endOffset);          // processes messages
> >> until
> >>>>> endOffset
> >>>>> }
> >>>>>
> >>>>> Does that make sense?
> >>>>>
> >>>>>
> >>>>> On Tue, Feb 25, 2014 at 9:49 AM, Neha Narkhede <
> >> neha.narkh...@gmail.com
> >>>> wrote:
> >>>>>
> >>>>>> Thanks for the review, Jun. Here are some comments -
> >>>>>>
> >>>>>>
> >>>>>> 1. The using of ellipsis: This may make passing a list of items from
> >> a
> >>>>>> collection to the api a bit harder. Suppose that you have a list of
> >>>>>> topics
> >>>>>> stored in
> >>>>>>
> >>>>>> ArrayList<String> topics;
> >>>>>>
> >>>>>> If you want subscribe to all topics in one call, you will have to
> do:
> >>>>>>
> >>>>>> String[] topicArray = new String[topics.size()];
> >>>>>> consumer.subscribe(topics.
> >>>>>> toArray(topicArray));
> >>>>>>
> >>>>>> A similar argument can be made for arguably the more common use case
> >> of
> >>>>>> subscribing to a single topic as well. In these cases, user is
> >> required
> >>>>>> to write more
> >>>>>> code to create a single item collection and pass it in. Since
> >>>>>> subscription is extremely lightweight
> >>>>>> invoking it multiple times also seems like a workable solution, no?
> >>>>>>
> >>>>>> 2. It would be good to document that the following apis are mutually
> >>>>>> exclusive. Also, if the partition level subscription is specified,
> >>> there
> >>>>>> is
> >>>>>> no group management. Finally, unsubscribe() can only be used to
> >> cancel
> >>>>>> subscriptions with the same pattern. For example, you can't
> >> unsubscribe
> >>>>>> at
> >>>>>> the partition level if the subscription is done at the topic level.
> >>>>>>
> >>>>>> *subscribe*(java.lang.String... topics)
> >>>>>> *subscribe*(java.lang.String topic, int... partitions)
> >>>>>>
> >>>>>> Makes sense. Made the suggested improvements to the docs<
> >>>
> >>
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/Consumer.html#subscribe%28java.lang.String...%29
> >>>>
> >>>>>>
> >>>>>>
> >>>>>> 3.commit(): The following comment in the doc should probably say
> >>> "commit
> >>>>>> offsets for partitions assigned to this consumer".
> >>>>>>
> >>>>>> If no partitions are specified, commits offsets for the subscribed
> >>> list
> >>>>>> of
> >>>>>> topics and partitions to Kafka.
> >>>>>>
> >>>>>> Could you give more context on this suggestion? Here is the entire
> >> doc
> >>> -
> >>>>>>
> >>>>>> Synchronously commits the specified offsets for the specified list
> of
> >>>>>> topics and partitions to *Kafka*. If no partitions are specified,
> >>>>>> commits offsets for the subscribed list of topics and partitions.
> >>>>>>
> >>>>>> The hope is to convey that if no partitions are specified, offsets
> >> will
> >>>>>> be committed for the subscribed list of partitions. One improvement
> >>> could
> >>>>>> be to
> >>>>>> explicitly state that the offsets returned on the last poll will be
> >>>>>> committed. I updated this to -
> >>>>>>
> >>>>>> Synchronously commits the specified offsets for the specified list
> of
> >>>>>> topics and partitions to *Kafka*. If no offsets are specified,
> >> commits
> >>>>>> offsets returned on the last {@link #poll(long, TimeUnit) poll()}
> for
> >>>>>> the subscribed list of topics and partitions.
> >>>>>>
> >>>>>> 4. There is inconsistency in specifying partitions. Sometimes we use
> >>>>>> TopicPartition and some other times we use String and int (see
> >>>>>> examples below).
> >>>>>>
> >>>>>> void onPartitionsAssigned(Consumer consumer,
> >>>>>> TopicPartition...partitions)
> >>>>>>
> >>>>>> public void *subscribe*(java.lang.String topic, int... partitions)
> >>>>>>
> >>>>>> Yes, this was discussed previously. I think generally the consensus
> >>>>>> seems to be to use the higher level
> >>>>>> classes everywhere. Made those changes.
> >>>>>>
> >>>>>> What's the use case of position()? Isn't that just the nextOffset()
> >> on
> >>>>>> the
> >>>>>> last message returned from poll()?
> >>>>>>
> >>>>>> Yes, except in the case where a rebalance is triggered and poll() is
> >>> not
> >>>>>> yet invoked. Here, you would use position() to get the new fetch
> >>> position
> >>>>>> for the specific partition. Even if this is not a common use case,
> >> IMO
> >>> it
> >>>>>> is much easier to use position() to get the fetch offset than
> >> invoking
> >>>>>> nextOffset() on the last message. This also keeps the APIs
> symmetric,
> >>> which
> >>>>>> is nice.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Mon, Feb 24, 2014 at 7:06 PM, Withers, Robert <
> >>>>>> robert.with...@dish.com> wrote:
> >>>>>>
> >>>>>>> That's wonderful.  Thanks for kafka.
> >>>>>>>
> >>>>>>> Rob
> >>>>>>>
> >>>>>>> On Feb 24, 2014, at 9:58 AM, Guozhang Wang <wangg...@gmail.com
> >>> <mailto:
> >>>>>>> wangg...@gmail.com>> wrote:
> >>>>>>>
> >>>>>>> Hi Robert,
> >>>>>>>
> >>>>>>> Yes, you can check out the callback functions in the new API
> >>>>>>>
> >>>>>>> onPartitionDesigned
> >>>>>>> onPartitionAssigned
> >>>>>>>
> >>>>>>> and see if they meet your needs.
> >>>>>>>
> >>>>>>> Guozhang
> >>>>>>>
> >>>>>>>
> >>>>>>> On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert <
> >>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com>>wrote:
> >>>>>>>
> >>>>>>> Jun,
> >>>>>>>
> >>>>>>> Are you saying it is possible to get events from the high-level
> >>> consumer
> >>>>>>> regarding various state machine changes?  For instance, can we get
> a
> >>>>>>> notification when a rebalance starts and ends, when a partition is
> >>>>>>> assigned/unassigned, when an offset is committed on a partition,
> >> when
> >>> a
> >>>>>>> leader changes and so on?  I call this OOB traffic, since they are
> >> not
> >>>>>>> the
> >>>>>>> core messages streaming, but side-band events, yet they are still
> >>>>>>> potentially useful to consumers.
> >>>>>>>
> >>>>>>> Thank you,
> >>>>>>> Robert
> >>>>>>>
> >>>>>>>
> >>>>>>> Robert Withers
> >>>>>>> Staff Analyst/Developer
> >>>>>>> o: (720) 514-8963
> >>>>>>> c:  (571) 262-1873
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> -----Original Message-----
> >>>>>>> From: Jun Rao [mailto:jun...@gmail.com]
> >>>>>>> Sent: Sunday, February 23, 2014 4:19 PM
> >>>>>>> To: users@kafka.apache.org<mailto:users@kafka.apache.org>
> >>>>>>> Subject: Re: New Consumer API discussion
> >>>>>>>
> >>>>>>> Robert,
> >>>>>>>
> >>>>>>> For the push orient api, you can potentially implement your own
> >>>>>>> MessageHandler with those methods. In the main loop of our new
> >>> consumer
> >>>>>>> api, you can just call those methods based on the events you get.
> >>>>>>>
> >>>>>>> Also, we already have an api to get the first and the last offset
> >> of a
> >>>>>>> partition (getOffsetBefore).
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>>
> >>>>>>> Jun
> >>>>>>>
> >>>>>>>
> >>>>>>> On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert
> >>>>>>> <robert.with...@dish.com<mailto:robert.with...@dish.com>>wrote:
> >>>>>>>
> >>>>>>> This is a good idea, too.  I would modify it to include stream
> >>>>>>> marking, then you can have:
> >>>>>>>
> >>>>>>> long end = consumer.lastOffset(tp);
> >>>>>>> consumer.setMark(end);
> >>>>>>> while(consumer.beforeMark()) {
> >>>>>>>  process(consumer.pollToMark());
> >>>>>>> }
> >>>>>>>
> >>>>>>> or
> >>>>>>>
> >>>>>>> long end = consumer.lastOffset(tp);
> >>>>>>> consumer.setMark(end);
> >>>>>>> for(Object msg : consumer.iteratorToMark()) {
> >>>>>>>  process(msg);
> >>>>>>> }
> >>>>>>>
> >>>>>>> I actually have 4 suggestions, then:
> >>>>>>>
> >>>>>>> *   pull: stream marking
> >>>>>>> *   pull: finite streams, bound by time range (up-to-now,
> yesterday)
> >>> or
> >>>>>>> offset
> >>>>>>> *   pull: async api
> >>>>>>> *   push: KafkaMessageSource, for a push model, with msg and OOB
> >>> events.
> >>>>>>> Build one in either individual or chunk mode and have a listener
> for
> >>>>>>> each msg or a listener for a chunk of msgs.  Make it composable and
> >>>>>>> policy driven (chunked, range, commitOffsets policy, retry policy,
> >>>>>>> transactional)
> >>>>>>>
> >>>>>>> Thank you,
> >>>>>>> Robert
> >>>>>>>
> >>>>>>> On Feb 22, 2014, at 11:21 AM, Jay Kreps <jay.kr...@gmail.com
> >> <mailto:
> >>>>>>> jay.kr...@gmail.com><mailto:
> >>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>>> wrote:
> >>>>>>>
> >>>>>>> I think what Robert is saying is that we need to think through the
> >>>>>>> offset API to enable "batch processing" of topic data. Think of a
> >>>>>>> process that periodically kicks off to compute a data summary or do
> >> a
> >>>>>>> data load or something like that. I think what we need to support
> >> this
> >>>>>>> is an api to fetch the last offset from the server for a partition.
> >>>>>>> Something like
> >>>>>>> long lastOffset(TopicPartition tp)
> >>>>>>> and for symmetry
> >>>>>>> long firstOffset(TopicPartition tp)
> >>>>>>>
> >>>>>>> Likely this would have to be batched. Essentially we should add
> this
> >>>>>>> use case to our set of code examples to write and think through.
> >>>>>>>
> >>>>>>> The usage would be something like
> >>>>>>>
> >>>>>>> long end = consumer.lastOffset(tp);
> >>>>>>> while(consumer.position < end)
> >>>>>>>  process(consumer.poll());
> >>>>>>>
> >>>>>>> -Jay
> >>>>>>>
> >>>>>>>
> >>>>>>> On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert
> >>>>>>> <robert.with...@dish.com<mailto:robert.with...@dish.com>
> >>>>>>> <mailto:robert.with...@dish.com>>wrote:
> >>>>>>>
> >>>>>>> Jun,
> >>>>>>>
> >>>>>>> I was originally thinking a non-blocking read from a distributed
> >>>>>>> stream should distinguish between "no local messages, but a fetch
> is
> >>>>>>> occurring"
> >>>>>>> versus "you have drained the stream".  The reason this may be
> >> valuable
> >>>>>>> to me is so I can write consumers that read all known traffic then
> >>>>>>> terminate.
> >>>>>>> You caused me to reconsider and I think I am conflating 2 things.
> >> One
> >>>>>>> is a sync/async api while the other is whether to have an infinite
> >> or
> >>>>>>> finite stream.  Is it possible to build a finite KafkaStream on a
> >>>>>>> range of messages?
> >>>>>>>
> >>>>>>> Perhaps a Simple Consumer would do just fine and then I could start
> >>>>>>> off getting the writeOffset from zookeeper and tell it to read a
> >>>>>>> specified range per partition.  I've done this and forked a simple
> >>>>>>> consumer runnable for each partition, for one of our analyzers.
>  The
> >>>>>>> great thing about the high-level consumer is that rebalance, so I
> >> can
> >>>>>>> fork however many stream readers I want and you just figure it out
> >> for
> >>>>>>> me.  In that way you offer us the control over the resource
> >>>>>>> consumption within a pull model.  This is best to regulate message
> >>>>>>> pressure, they say.
> >>>>>>>
> >>>>>>> Combining that high-level rebalance ability with a ranged partition
> >>>>>>> drain could be really nice...build the stream with an ending
> >> position
> >>>>>>> and it is a finite stream, but retain the high-level rebalance.
> >> With
> >>>>>>> a finite stream, you would know the difference of the 2 async
> >>>>>>> scenarios: fetch-in-progress versus end-of-stream.  With an
> infinite
> >>>>>>> stream, you never get end-of-stream.
> >>>>>>>
> >>>>>>> Aside from a high-level consumer over a finite range within each
> >>>>>>> partition, the other feature I can think of is more complicated.  A
> >>>>>>> high-level consumer has state machine changes that the client
> cannot
> >>>>>>> access, to my knowledge.  Our use of kafka has us invoke a message
> >>>>>>> handler with each message we consumer from the KafkaStream, so we
> >>>>>>> convert a pull-model to a push-model.  Including the idea of
> >> receiving
> >>>>>>> notifications from state machine changes, what would be really nice
> >> is
> >>>>>>> to have a KafkaMessageSource, that is an eventful push model.  If
> it
> >>>>>>> were thread-safe, then we could register listeners for various
> >> events:
> >>>>>>>
> >>>>>>> *   opening-stream
> >>>>>>> *   closing-stream
> >>>>>>> *   message-arrived
> >>>>>>> *   end-of-stream/no-more-messages-in-partition (for finite
> streams)
> >>>>>>> *   rebalance started
> >>>>>>> *   partition assigned
> >>>>>>> *   partition unassigned
> >>>>>>> *   rebalance finished
> >>>>>>> *   partition-offset-committed
> >>>>>>>
> >>>>>>> Perhaps that is just our use, but instead of a pull-oriented
> >>>>>>> KafkaStream, is there any sense in your providing a push-oriented
> >>>>>>> KafkaMessageSource publishing OOB messages?
> >>>>>>>
> >>>>>>> thank you,
> >>>>>>> Robert
> >>>>>>>
> >>>>>>> On Feb 21, 2014, at 5:59 PM, Jun Rao <jun...@gmail.com<mailto:
> >>>>>>> jun...@gmail.com><mailto:
> >>>>>>> jun...@gmail.com<mailto:jun...@gmail.com>><mailto:
> >>>>>>> jun...@gmail.com<mailto:jun...@gmail.com><mailto:jun...@gmail.com
> >>>>>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>> Robert,
> >>>>>>>
> >>>>>>> Could you explain why you want to distinguish btw
> >>>>>>> FetchingInProgressException and NoMessagePendingException? The
> >>>>>>> nextMsgs() method that you want is exactly what poll() does.
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>>
> >>>>>>> Jun
> >>>>>>>
> >>>>>>>
> >>>>>>> On Wed, Feb 19, 2014 at 8:45 AM, Withers, Robert
> >>>>>>> <robert.with...@dish.com<mailto:robert.with...@dish.com> <mailto:
> >>>>>>> robert.with...@dish.com>
> >>>>>>> <mailto:robert.with...@dish.com>>wrote:
> >>>>>>>
> >>>>>>> I am not clear on why the consumer stream should be positionable,
> >>>>>>> especially if it is limited to the in-memory fetched messages.
> >> Could
> >>>>>>> someone explain to me, please?  I really like the idea of
> committing
> >>>>>>> the offset specifically on those partitions with changed read
> >> offsets,
> >>>>>>> only.
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> 2 items I would like to see added to the KafkaStream are:
> >>>>>>>
> >>>>>>> *         a non-blocking next(), throws several exceptions
> >>>>>>> (FetchingInProgressException and a NoMessagePendingException or
> >>>>>>> something) to differentiate between fetching or no messages left.
> >>>>>>>
> >>>>>>> *         A nextMsgs() method which returns all locally available
> >>>>>>> messages
> >>>>>>> and kicks off a fetch for the next chunk.
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> If you are trying to add transactional features, then formally
> >> define
> >>>>>>> a DTP capability and pull in other server frameworks to share the
> >>>>>>> implementation.  Should it be XA/Open?  How about a new peer2peer
> >> DTP
> >>>>>>> protocol?
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> Thank you,
> >>>>>>>
> >>>>>>> Robert
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> Robert Withers
> >>>>>>>
> >>>>>>> Staff Analyst/Developer
> >>>>>>>
> >>>>>>> o: (720) 514-8963
> >>>>>>>
> >>>>>>> c:  (571) 262-1873
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> -----Original Message-----
> >>>>>>> From: Jay Kreps [mailto:jay.kr...@gmail.com]
> >>>>>>> Sent: Sunday, February 16, 2014 10:13 AM
> >>>>>>> To: users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:
> >>>>>>> users@kafka.apache.org><mailto:
> >>>>>>> users@kafka.apache.org<mailto:users@kafka.apache.org>>
> >>>>>>> Subject: Re: New Consumer API discussion
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> +1 I think those are good. It is a little weird that changing the
> >>>>>>> +fetch
> >>>>>>>
> >>>>>>> point is not batched but changing the commit point is, but I
> suppose
> >>>>>>> there is no helping that.
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> -Jay
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On Sat, Feb 15, 2014 at 7:52 AM, Neha Narkhede
> >>>>>>> <neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com> <mailto:
> >>>>>>> neha.narkh...@gmail.com>
> >>>>>>> <mailto:neha.narkh...@gmail.com>
> >>>>>>> <mailto:neha.narkh...@gmail.com>>wrote:
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> Jay,
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> That makes sense. position/seek deal with changing the consumers
> >>>>>>>
> >>>>>>> in-memory data, so there is no remote rpc there. For some reason, I
> >>>>>>>
> >>>>>>> got committed and seek mixed up in my head at that time :)
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> So we still end up with
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> long position(TopicPartition tp)
> >>>>>>>
> >>>>>>> void seek(TopicPartitionOffset p)
> >>>>>>>
> >>>>>>> Map<TopicPartition, Long> committed(TopicPartition tp);
> >>>>>>>
> >>>>>>> void commit(TopicPartitionOffset...);
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>>
> >>>>>>> Neha
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On Friday, February 14, 2014, Jay Kreps <jay.kr...@gmail.com
> >> <mailto:
> >>>>>>> jay.kr...@gmail.com><mailto:
> >>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto:
> >>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:
> >>>>>>> jay.kr...@gmail.com>><mailto:
> >>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:
> >>>>>>> jay.kr...@gmail.com><mailto:jay.kreps@gmail
> >>>>>>> .com>>>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> Oh, interesting. So I am assuming the following implementation:
> >>>>>>>
> >>>>>>> 1. We have an in-memory fetch position which controls the next
> fetch
> >>>>>>>
> >>>>>>> offset.
> >>>>>>>
> >>>>>>> 2. Changing this has no effect until you poll again at which point
> >>>>>>>
> >>>>>>> your fetch request will be from the newly specified offset 3. We
> >>>>>>>
> >>>>>>> then have an in-memory but also remotely stored committed offset.
> >>>>>>>
> >>>>>>> 4. Calling commit has the effect of saving the fetch position as
> >>>>>>>
> >>>>>>> both the in memory committed position and in the remote store 5.
> >>>>>>>
> >>>>>>> Auto-commit is the same as periodically calling commit on all
> >>>>>>>
> >>>>>>> positions.
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> So batching on commit as well as getting the committed position
> >>>>>>>
> >>>>>>> makes sense, but batching the fetch position wouldn't, right? I
> >>>>>>>
> >>>>>>> think you are actually thinking of a different approach.
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> -Jay
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On Thu, Feb 13, 2014 at 10:40 PM, Neha Narkhede
> >>>>>>>
> >>>>>>> <neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto:
> >>>>>>> neha.narkh...@gmail.com><mailto:
> >>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>>
> >>>>>>>
> >>>>>>> <javascript:;>
> >>>>>>>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> I think you are saying both, i.e. if you have committed on a
> >>>>>>>
> >>>>>>> partition it returns you that value but if you
> >>>>>>>
> >>>>>>> haven't
> >>>>>>>
> >>>>>>> it does a remote lookup?
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> Correct.
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> The other argument for making committed batched is that commit()
> >>>>>>>
> >>>>>>> is batched, so there is symmetry.
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> position() and seek() are always in memory changes (I assume) so
> >>>>>>>
> >>>>>>> there
> >>>>>>>
> >>>>>>> is
> >>>>>>>
> >>>>>>> no need to batch them.
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> I'm not as sure as you are about that assumption being true.
> >>>>>>>
> >>>>>>> Basically
> >>>>>>>
> >>>>>>> in
> >>>>>>>
> >>>>>>> my example above, the batching argument for committed() also
> >>>>>>>
> >>>>>>> applies to
> >>>>>>>
> >>>>>>> position() since one purpose of fetching a partition's offset is
> >>>>>>>
> >>>>>>> to use
> >>>>>>>
> >>>>>>> it
> >>>>>>>
> >>>>>>> to set the position of the consumer to that offset. Since that
> >>>>>>>
> >>>>>>> might
> >>>>>>>
> >>>>>>> lead
> >>>>>>>
> >>>>>>> to a remote OffsetRequest call, I think we probably would be
> >>>>>>>
> >>>>>>> better off batching it.
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> Another option for naming would be position/reposition instead of
> >>>>>>>
> >>>>>>> position/seek.
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> I think position/seek is better since it aligns with Java file
> APIs.
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> I also think your suggestion about ConsumerPosition makes sense.
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>>
> >>>>>>> Neha
> >>>>>>>
> >>>>>>> On Feb 13, 2014 9:22 PM, "Jay Kreps" <jay.kr...@gmail.com<mailto:
> >>>>>>> jay.kr...@gmail.com><mailto:
> >>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto:
> >>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:
> >>>>>>> jay.kr...@gmail.com>><mailto:
> >>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:
> >>>>>>> jay.kr...@gmail.com><mailto:jay.kreps@gmail
> >>>>>>> .com>>>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> Hey Neha,
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> I actually wasn't proposing the name TopicOffsetPosition, that
> >>>>>>>
> >>>>>>> was
> >>>>>>>
> >>>>>>> just a
> >>>>>>>
> >>>>>>> typo. I meant TopicPartitionOffset, and I was just referencing
> >>>>>>>
> >>>>>>> what
> >>>>>>>
> >>>>>>> was
> >>>>>>>
> >>>>>>> in
> >>>>>>>
> >>>>>>> the javadoc. So to restate my proposal without the typo, using
> >>>>>>>
> >>>>>>> just
> >>>>>>>
> >>>>>>> the
> >>>>>>>
> >>>>>>> existing classes (that naming is a separate question):
> >>>>>>>
> >>>>>>> long position(TopicPartition tp)
> >>>>>>>
> >>>>>>> void seek(TopicPartitionOffset p)
> >>>>>>>
> >>>>>>> long committed(TopicPartition tp)
> >>>>>>>
> >>>>>>> void commit(TopicPartitionOffset...);
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> So I may be unclear on committed() (AKA lastCommittedOffset). Is
> >>>>>>>
> >>>>>>> it returning the in-memory value from the last commit by this
> >>>>>>>
> >>>>>>> consumer,
> >>>>>>>
> >>>>>>> or
> >>>>>>>
> >>>>>>> is
> >>>>>>>
> >>>>>>> it doing a remote fetch, or both? I think you are saying both, i.e.
> >>>>>>>
> >>>>>>> if
> >>>>>>>
> >>>>>>> you
> >>>>>>>
> >>>>>>> have committed on a partition it returns you that value but if
> >>>>>>>
> >>>>>>> you
> >>>>>>>
> >>>>>>> haven't
> >>>>>>>
> >>>>>>> it does a remote lookup?
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> The other argument for making committed batched is that commit()
> >>>>>>>
> >>>>>>> is batched, so there is symmetry.
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> position() and seek() are always in memory changes (I assume) so
> >>>>>>>
> >>>>>>> there
> >>>>>>>
> >>>>>>> is
> >>>>>>>
> >>>>>>> no need to batch them.
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> So taking all that into account what if we revise it to
> >>>>>>>
> >>>>>>> long position(TopicPartition tp)
> >>>>>>>
> >>>>>>> void seek(TopicPartitionOffset p)
> >>>>>>>
> >>>>>>> Map<TopicPartition, Long> committed(TopicPartition tp);
> >>>>>>>
> >>>>>>> void commit(TopicPartitionOffset...);
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> This is not symmetric between position/seek and commit/committed
> >>>>>>>
> >>>>>>> but
> >>>>>>>
> >>>>>>> it
> >>>>>>>
> >>>>>>> is
> >>>>>>>
> >>>>>>> convenient. Another option for naming would be
> >>>>>>>
> >>>>>>> position/reposition
> >>>>>>>
> >>>>>>> instead
> >>>>>>>
> >>>>>>> of position/seek.
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> With respect to the name TopicPartitionOffset, what I was trying
> >>>>>>>
> >>>>>>> to
> >>>>>>>
> >>>>>>> say
> >>>>>>>
> >>>>>>> is
> >>>>>>>
> >>>>>>> that I recommend we change that to something shorter. I think
> >>>>>>>
> >>>>>>> TopicPosition
> >>>>>>>
> >>>>>>> or ConsumerPosition might be better. Position does not refer to
> >>>>>>>
> >>>>>>> the variables in the object, it refers to the meaning of the
> >>>>>>>
> >>>>>>> object--it represents a position within a topic. The offset
> >>>>>>>
> >>>>>>> field in that object
> >>>>>>>
> >>>>>>> is
> >>>>>>>
> >>>>>>> still called the offset. TopicOffset, PartitionOffset, or
> >>>>>>>
> >>>>>>> ConsumerOffset
> >>>>>>>
> >>>>>>> would all be workable too. Basically I am just objecting to
> >>>>>>>
> >>>>>>> concatenating
> >>>>>>>
> >>>>>>> three nouns together. :-)
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> -Jay
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On Thu, Feb 13, 2014 at 1:54 PM, Neha Narkhede <
> >>>>>>>
> >>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto:
> >>>>>>> neha.narkh...@gmail.com><mailto:
> >>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>><mailto:
> >>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto:
> >>>>>>> neha.narkh...@gmail.com>>
> >>>>>>>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> 2. It returns a list of results. But how can you use the list?
> >>>>>>>
> >>>>>>> The
> >>>>>>>
> >>>>>>> only
> >>>>>>>
> >>>>>>> way
> >>>>>>>
> >>>>>>> to use the list is to make a map of tp=>offset and then look
> >>>>>>>
> >>>>>>> up
> >>>>>>>
> >>>>>>> results
> >>>>>>>
> >>>>>>> in
> >>>>>>>
> >>>>>>> this map (or do a for loop over the list for the partition you
> >>>>>>>
> >>>>>>> want). I
> >>>>>>>
> >>>>>>> recommend that if this is an in-memory check we just do one at
> >>>>>>>
> >>>>>>> a
> >>>>>>>
> >>>>>>> time.
> >>>>>>>
> >>>>>>> E.g.
> >>>>>>>
> >>>>>>> long committedPosition(
> >>>>>>>
> >>>>>>> TopicPosition).
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> This was discussed in the previous emails. There is a choic
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>> Robert Withers
> >>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com><mailto:
> >>>>>>> robert.with...@dish.com><mailto:
> >>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com>>
> >>>>>>> c: 303.919.5856
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>> Robert Withers
> >>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com><mailto:
> >>>>>>> robert.with...@dish.com>
> >>>>>>> c: 303.919.5856
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>> -- Guozhang
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>> Robert Withers
> >>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com>
> >>>>>>> c: 303.919.5856
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
>
>

Reply via email to