1. I would prefer PartitionsAssigned and PartitionsRevoked as that seems
clearer to me.

-Jay


On Tue, Feb 25, 2014 at 10:19 AM, Neha Narkhede <neha.narkh...@gmail.com>wrote:

> Thanks for the reviews so far! There are a few outstanding questions -
>
> 1.  It will be good to make the rebalance callbacks forward compatible with
> Java 8 capabilities. We can change it to PartitionsAssignedCallback
> and PartitionsRevokedCallback or RebalanceBeginCallback and
> RebalanceEndCallback?
>
> If there are no objections, I will change it to RebalanceBeginCallback and
> RebalanceEndCallback.
>
> 2.  The return type for committed() is List<TopicPartitionOffset>. There
> was a suggestion to change it to either be Map<TopicPartition,Long> or
> Map<TopicPartition, TopicPartitionOffset>
>
> Do people have feedback on this suggestion?
>
>
> On Tue, Feb 25, 2014 at 9:56 AM, Neha Narkhede <neha.narkh...@gmail.com
> >wrote:
>
> > Robert,
> >
> > Are you saying it is possible to get events from the high-level
> consumerregarding various state machine changes?  For instance, can we get a
> > notification when a rebalance starts and ends, when a partition is
> > assigned/unassigned, when an offset is committed on a partition, when a
> > leader changes and so on?  I call this OOB traffic, since they are not
> the
> > core messages streaming, but side-band events, yet they are still
> > potentially useful to consumers.
> >
> > In the current proposal, you get notified when the state machine changes
> > i.e. before and after a rebalance is triggered. Look at
> > ConsumerRebalanceCallback<
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerRebalanceCallback.html
> >
> > .Leader changes do not count as state machine changes for consumer
> > rebalance purposes.
> >
> > Thanks,
> > Neha
> >
> >
> > On Tue, Feb 25, 2014 at 9:54 AM, Neha Narkhede <neha.narkh...@gmail.com
> >wrote:
> >
> >> Jay/Robert -
> >>
> >>
> >> I think what Robert is saying is that we need to think through the
> offset
> >> API to enable "batch processing" of topic data. Think of a process that
> >> periodically kicks off to compute a data summary or do a data load or
> >> something like that. I think what we need to support this is an api to
> >> fetch the last offset from the server for a partition. Something like
> >>    long lastOffset(TopicPartition tp)
> >> and for symmetry
> >>    long firstOffset(TopicPartition tp)
> >>
> >> Likely this would have to be batched.
> >>
> >> A fixed range of data load can be done using the existing APIs as
> >> follows. This assumes you know the endOffset which can be currentOffset
> + n
> >> (number of messages in the load)
> >>
> >> long startOffset = consumer.position(partition);
> >> long endOffset = startOffset + n;
> >> while(consumer.position(partition) <= endOffset) {
> >>      List<ConsumerRecord> messages = consumer.poll(timeout,
> >> TimeUnit.MILLISECONDS);
> >>      process(messages, endOffset);          // processes messages until
> >> endOffset
> >> }
> >>
> >> Does that make sense?
> >>
> >>
> >> On Tue, Feb 25, 2014 at 9:49 AM, Neha Narkhede <neha.narkh...@gmail.com
> >wrote:
> >>
> >>> Thanks for the review, Jun. Here are some comments -
> >>>
> >>>
> >>> 1. The using of ellipsis: This may make passing a list of items from a
> >>> collection to the api a bit harder. Suppose that you have a list of
> >>> topics
> >>> stored in
> >>>
> >>> ArrayList<String> topics;
> >>>
> >>> If you want subscribe to all topics in one call, you will have to do:
> >>>
> >>> String[] topicArray = new String[topics.size()];
> >>> consumer.subscribe(topics.
> >>> toArray(topicArray));
> >>>
> >>> A similar argument can be made for arguably the more common use case of
> >>> subscribing to a single topic as well. In these cases, user is required
> >>> to write more
> >>> code to create a single item collection and pass it in. Since
> >>> subscription is extremely lightweight
> >>> invoking it multiple times also seems like a workable solution, no?
> >>>
> >>> 2. It would be good to document that the following apis are mutually
> >>> exclusive. Also, if the partition level subscription is specified,
> there
> >>> is
> >>> no group management. Finally, unsubscribe() can only be used to cancel
> >>> subscriptions with the same pattern. For example, you can't unsubscribe
> >>> at
> >>> the partition level if the subscription is done at the topic level.
> >>>
> >>> *subscribe*(java.lang.String... topics)
> >>> *subscribe*(java.lang.String topic, int... partitions)
> >>>
> >>> Makes sense. Made the suggested improvements to the docs<
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/Consumer.html#subscribe%28java.lang.String...%29
> >
> >>>
> >>>
> >>> 3.commit(): The following comment in the doc should probably say
> "commit
> >>> offsets for partitions assigned to this consumer".
> >>>
> >>>  If no partitions are specified, commits offsets for the subscribed
> list
> >>> of
> >>> topics and partitions to Kafka.
> >>>
> >>> Could you give more context on this suggestion? Here is the entire doc
> -
> >>>
> >>> Synchronously commits the specified offsets for the specified list of
> >>> topics and partitions to *Kafka*. If no partitions are specified,
> >>> commits offsets for the subscribed list of topics and partitions.
> >>>
> >>> The hope is to convey that if no partitions are specified, offsets will
> >>> be committed for the subscribed list of partitions. One improvement
> could
> >>> be to
> >>> explicitly state that the offsets returned on the last poll will be
> >>> committed. I updated this to -
> >>>
> >>> Synchronously commits the specified offsets for the specified list of
> >>> topics and partitions to *Kafka*. If no offsets are specified, commits
> >>> offsets returned on the last {@link #poll(long, TimeUnit) poll()} for
> >>> the subscribed list of topics and partitions.
> >>>
> >>> 4. There is inconsistency in specifying partitions. Sometimes we use
> >>> TopicPartition and some other times we use String and int (see
> >>> examples below).
> >>>
> >>> void onPartitionsAssigned(Consumer consumer,
> >>> TopicPartition...partitions)
> >>>
> >>> public void *subscribe*(java.lang.String topic, int... partitions)
> >>>
> >>> Yes, this was discussed previously. I think generally the consensus
> >>> seems to be to use the higher level
> >>> classes everywhere. Made those changes.
> >>>
> >>> What's the use case of position()? Isn't that just the nextOffset() on
> >>> the
> >>> last message returned from poll()?
> >>>
> >>> Yes, except in the case where a rebalance is triggered and poll() is
> not
> >>> yet invoked. Here, you would use position() to get the new fetch
> position
> >>> for the specific partition. Even if this is not a common use case, IMO
> it
> >>> is much easier to use position() to get the fetch offset than invoking
> >>> nextOffset() on the last message. This also keeps the APIs symmetric,
> which
> >>> is nice.
> >>>
> >>>
> >>>
> >>>
> >>> On Mon, Feb 24, 2014 at 7:06 PM, Withers, Robert <
> >>> robert.with...@dish.com> wrote:
> >>>
> >>>> That's wonderful.  Thanks for kafka.
> >>>>
> >>>> Rob
> >>>>
> >>>> On Feb 24, 2014, at 9:58 AM, Guozhang Wang <wangg...@gmail.com
> <mailto:
> >>>> wangg...@gmail.com>> wrote:
> >>>>
> >>>> Hi Robert,
> >>>>
> >>>> Yes, you can check out the callback functions in the new API
> >>>>
> >>>> onPartitionDesigned
> >>>> onPartitionAssigned
> >>>>
> >>>> and see if they meet your needs.
> >>>>
> >>>> Guozhang
> >>>>
> >>>>
> >>>> On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert <
> >>>> robert.with...@dish.com<mailto:robert.with...@dish.com>>wrote:
> >>>>
> >>>> Jun,
> >>>>
> >>>> Are you saying it is possible to get events from the high-level
> consumer
> >>>> regarding various state machine changes?  For instance, can we get a
> >>>> notification when a rebalance starts and ends, when a partition is
> >>>> assigned/unassigned, when an offset is committed on a partition, when
> a
> >>>> leader changes and so on?  I call this OOB traffic, since they are not
> >>>> the
> >>>> core messages streaming, but side-band events, yet they are still
> >>>> potentially useful to consumers.
> >>>>
> >>>> Thank you,
> >>>> Robert
> >>>>
> >>>>
> >>>> Robert Withers
> >>>> Staff Analyst/Developer
> >>>> o: (720) 514-8963
> >>>> c:  (571) 262-1873
> >>>>
> >>>>
> >>>>
> >>>> -----Original Message-----
> >>>> From: Jun Rao [mailto:jun...@gmail.com]
> >>>> Sent: Sunday, February 23, 2014 4:19 PM
> >>>> To: users@kafka.apache.org<mailto:users@kafka.apache.org>
> >>>> Subject: Re: New Consumer API discussion
> >>>>
> >>>> Robert,
> >>>>
> >>>> For the push orient api, you can potentially implement your own
> >>>> MessageHandler with those methods. In the main loop of our new
> consumer
> >>>> api, you can just call those methods based on the events you get.
> >>>>
> >>>> Also, we already have an api to get the first and the last offset of a
> >>>> partition (getOffsetBefore).
> >>>>
> >>>> Thanks,
> >>>>
> >>>> Jun
> >>>>
> >>>>
> >>>> On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert
> >>>> <robert.with...@dish.com<mailto:robert.with...@dish.com>>wrote:
> >>>>
> >>>> This is a good idea, too.  I would modify it to include stream
> >>>> marking, then you can have:
> >>>>
> >>>> long end = consumer.lastOffset(tp);
> >>>> consumer.setMark(end);
> >>>> while(consumer.beforeMark()) {
> >>>>   process(consumer.pollToMark());
> >>>> }
> >>>>
> >>>> or
> >>>>
> >>>> long end = consumer.lastOffset(tp);
> >>>> consumer.setMark(end);
> >>>> for(Object msg : consumer.iteratorToMark()) {
> >>>>   process(msg);
> >>>> }
> >>>>
> >>>> I actually have 4 suggestions, then:
> >>>>
> >>>> *   pull: stream marking
> >>>> *   pull: finite streams, bound by time range (up-to-now, yesterday)
> or
> >>>> offset
> >>>> *   pull: async api
> >>>> *   push: KafkaMessageSource, for a push model, with msg and OOB
> events.
> >>>> Build one in either individual or chunk mode and have a listener for
> >>>> each msg or a listener for a chunk of msgs.  Make it composable and
> >>>> policy driven (chunked, range, commitOffsets policy, retry policy,
> >>>> transactional)
> >>>>
> >>>> Thank you,
> >>>> Robert
> >>>>
> >>>> On Feb 22, 2014, at 11:21 AM, Jay Kreps <jay.kr...@gmail.com<mailto:
> >>>> jay.kr...@gmail.com><mailto:
> >>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>>> wrote:
> >>>>
> >>>> I think what Robert is saying is that we need to think through the
> >>>> offset API to enable "batch processing" of topic data. Think of a
> >>>> process that periodically kicks off to compute a data summary or do a
> >>>> data load or something like that. I think what we need to support this
> >>>> is an api to fetch the last offset from the server for a partition.
> >>>> Something like
> >>>>  long lastOffset(TopicPartition tp)
> >>>> and for symmetry
> >>>>  long firstOffset(TopicPartition tp)
> >>>>
> >>>> Likely this would have to be batched. Essentially we should add this
> >>>> use case to our set of code examples to write and think through.
> >>>>
> >>>> The usage would be something like
> >>>>
> >>>> long end = consumer.lastOffset(tp);
> >>>> while(consumer.position < end)
> >>>>   process(consumer.poll());
> >>>>
> >>>> -Jay
> >>>>
> >>>>
> >>>> On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert
> >>>> <robert.with...@dish.com<mailto:robert.with...@dish.com>
> >>>> <mailto:robert.with...@dish.com>>wrote:
> >>>>
> >>>> Jun,
> >>>>
> >>>> I was originally thinking a non-blocking read from a distributed
> >>>> stream should distinguish between "no local messages, but a fetch is
> >>>> occurring"
> >>>> versus "you have drained the stream".  The reason this may be valuable
> >>>> to me is so I can write consumers that read all known traffic then
> >>>> terminate.
> >>>> You caused me to reconsider and I think I am conflating 2 things.  One
> >>>> is a sync/async api while the other is whether to have an infinite or
> >>>> finite stream.  Is it possible to build a finite KafkaStream on a
> >>>> range of messages?
> >>>>
> >>>> Perhaps a Simple Consumer would do just fine and then I could start
> >>>> off getting the writeOffset from zookeeper and tell it to read a
> >>>> specified range per partition.  I've done this and forked a simple
> >>>> consumer runnable for each partition, for one of our analyzers.  The
> >>>> great thing about the high-level consumer is that rebalance, so I can
> >>>> fork however many stream readers I want and you just figure it out for
> >>>> me.  In that way you offer us the control over the resource
> >>>> consumption within a pull model.  This is best to regulate message
> >>>> pressure, they say.
> >>>>
> >>>> Combining that high-level rebalance ability with a ranged partition
> >>>> drain could be really nice...build the stream with an ending position
> >>>> and it is a finite stream, but retain the high-level rebalance.  With
> >>>> a finite stream, you would know the difference of the 2 async
> >>>> scenarios: fetch-in-progress versus end-of-stream.  With an infinite
> >>>> stream, you never get end-of-stream.
> >>>>
> >>>> Aside from a high-level consumer over a finite range within each
> >>>> partition, the other feature I can think of is more complicated.  A
> >>>> high-level consumer has state machine changes that the client cannot
> >>>> access, to my knowledge.  Our use of kafka has us invoke a message
> >>>> handler with each message we consumer from the KafkaStream, so we
> >>>> convert a pull-model to a push-model.  Including the idea of receiving
> >>>> notifications from state machine changes, what would be really nice is
> >>>> to have a KafkaMessageSource, that is an eventful push model.  If it
> >>>> were thread-safe, then we could register listeners for various events:
> >>>>
> >>>> *   opening-stream
> >>>> *   closing-stream
> >>>> *   message-arrived
> >>>> *   end-of-stream/no-more-messages-in-partition (for finite streams)
> >>>> *   rebalance started
> >>>> *   partition assigned
> >>>> *   partition unassigned
> >>>> *   rebalance finished
> >>>> *   partition-offset-committed
> >>>>
> >>>> Perhaps that is just our use, but instead of a pull-oriented
> >>>> KafkaStream, is there any sense in your providing a push-oriented
> >>>> KafkaMessageSource publishing OOB messages?
> >>>>
> >>>> thank you,
> >>>> Robert
> >>>>
> >>>> On Feb 21, 2014, at 5:59 PM, Jun Rao <jun...@gmail.com<mailto:
> >>>> jun...@gmail.com><mailto:
> >>>> jun...@gmail.com<mailto:jun...@gmail.com>><mailto:
> >>>> jun...@gmail.com<mailto:jun...@gmail.com><mailto:jun...@gmail.com>>>
> >>>> wrote:
> >>>>
> >>>> Robert,
> >>>>
> >>>> Could you explain why you want to distinguish btw
> >>>> FetchingInProgressException and NoMessagePendingException? The
> >>>> nextMsgs() method that you want is exactly what poll() does.
> >>>>
> >>>> Thanks,
> >>>>
> >>>> Jun
> >>>>
> >>>>
> >>>> On Wed, Feb 19, 2014 at 8:45 AM, Withers, Robert
> >>>> <robert.with...@dish.com<mailto:robert.with...@dish.com> <mailto:
> >>>> robert.with...@dish.com>
> >>>> <mailto:robert.with...@dish.com>>wrote:
> >>>>
> >>>> I am not clear on why the consumer stream should be positionable,
> >>>> especially if it is limited to the in-memory fetched messages.  Could
> >>>> someone explain to me, please?  I really like the idea of committing
> >>>> the offset specifically on those partitions with changed read offsets,
> >>>> only.
> >>>>
> >>>>
> >>>>
> >>>> 2 items I would like to see added to the KafkaStream are:
> >>>>
> >>>> *         a non-blocking next(), throws several exceptions
> >>>> (FetchingInProgressException and a NoMessagePendingException or
> >>>> something) to differentiate between fetching or no messages left.
> >>>>
> >>>> *         A nextMsgs() method which returns all locally available
> >>>> messages
> >>>> and kicks off a fetch for the next chunk.
> >>>>
> >>>>
> >>>>
> >>>> If you are trying to add transactional features, then formally define
> >>>> a DTP capability and pull in other server frameworks to share the
> >>>> implementation.  Should it be XA/Open?  How about a new peer2peer DTP
> >>>> protocol?
> >>>>
> >>>>
> >>>>
> >>>> Thank you,
> >>>>
> >>>> Robert
> >>>>
> >>>>
> >>>>
> >>>> Robert Withers
> >>>>
> >>>> Staff Analyst/Developer
> >>>>
> >>>> o: (720) 514-8963
> >>>>
> >>>> c:  (571) 262-1873
> >>>>
> >>>>
> >>>>
> >>>> -----Original Message-----
> >>>> From: Jay Kreps [mailto:jay.kr...@gmail.com]
> >>>> Sent: Sunday, February 16, 2014 10:13 AM
> >>>> To: users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:
> >>>> users@kafka.apache.org><mailto:
> >>>> users@kafka.apache.org<mailto:users@kafka.apache.org>>
> >>>> Subject: Re: New Consumer API discussion
> >>>>
> >>>>
> >>>>
> >>>> +1 I think those are good. It is a little weird that changing the
> >>>> +fetch
> >>>>
> >>>> point is not batched but changing the commit point is, but I suppose
> >>>> there is no helping that.
> >>>>
> >>>>
> >>>>
> >>>> -Jay
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> On Sat, Feb 15, 2014 at 7:52 AM, Neha Narkhede
> >>>> <neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com> <mailto:
> >>>> neha.narkh...@gmail.com>
> >>>> <mailto:neha.narkh...@gmail.com>
> >>>> <mailto:neha.narkh...@gmail.com>>wrote:
> >>>>
> >>>>
> >>>>
> >>>> Jay,
> >>>>
> >>>>
> >>>>
> >>>> That makes sense. position/seek deal with changing the consumers
> >>>>
> >>>> in-memory data, so there is no remote rpc there. For some reason, I
> >>>>
> >>>> got committed and seek mixed up in my head at that time :)
> >>>>
> >>>>
> >>>>
> >>>> So we still end up with
> >>>>
> >>>>
> >>>>
> >>>> long position(TopicPartition tp)
> >>>>
> >>>> void seek(TopicPartitionOffset p)
> >>>>
> >>>> Map<TopicPartition, Long> committed(TopicPartition tp);
> >>>>
> >>>> void commit(TopicPartitionOffset...);
> >>>>
> >>>>
> >>>>
> >>>> Thanks,
> >>>>
> >>>> Neha
> >>>>
> >>>>
> >>>>
> >>>> On Friday, February 14, 2014, Jay Kreps <jay.kr...@gmail.com<mailto:
> >>>> jay.kr...@gmail.com><mailto:
> >>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto:
> >>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:
> >>>> jay.kr...@gmail.com>><mailto:
> >>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:
> >>>> jay.kr...@gmail.com><mailto:jay.kreps@gmail
> >>>> .com>>>
> >>>> wrote:
> >>>>
> >>>>
> >>>>
> >>>> Oh, interesting. So I am assuming the following implementation:
> >>>>
> >>>> 1. We have an in-memory fetch position which controls the next fetch
> >>>>
> >>>> offset.
> >>>>
> >>>> 2. Changing this has no effect until you poll again at which point
> >>>>
> >>>> your fetch request will be from the newly specified offset 3. We
> >>>>
> >>>> then have an in-memory but also remotely stored committed offset.
> >>>>
> >>>> 4. Calling commit has the effect of saving the fetch position as
> >>>>
> >>>> both the in memory committed position and in the remote store 5.
> >>>>
> >>>> Auto-commit is the same as periodically calling commit on all
> >>>>
> >>>> positions.
> >>>>
> >>>>
> >>>>
> >>>> So batching on commit as well as getting the committed position
> >>>>
> >>>> makes sense, but batching the fetch position wouldn't, right? I
> >>>>
> >>>> think you are actually thinking of a different approach.
> >>>>
> >>>>
> >>>>
> >>>> -Jay
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> On Thu, Feb 13, 2014 at 10:40 PM, Neha Narkhede
> >>>>
> >>>> <neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto:
> >>>> neha.narkh...@gmail.com><mailto:
> >>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>>
> >>>>
> >>>> <javascript:;>
> >>>>
> >>>> wrote:
> >>>>
> >>>>
> >>>>
> >>>> I think you are saying both, i.e. if you have committed on a
> >>>>
> >>>> partition it returns you that value but if you
> >>>>
> >>>> haven't
> >>>>
> >>>> it does a remote lookup?
> >>>>
> >>>>
> >>>>
> >>>> Correct.
> >>>>
> >>>>
> >>>>
> >>>> The other argument for making committed batched is that commit()
> >>>>
> >>>> is batched, so there is symmetry.
> >>>>
> >>>>
> >>>>
> >>>> position() and seek() are always in memory changes (I assume) so
> >>>>
> >>>> there
> >>>>
> >>>> is
> >>>>
> >>>> no need to batch them.
> >>>>
> >>>>
> >>>>
> >>>> I'm not as sure as you are about that assumption being true.
> >>>>
> >>>> Basically
> >>>>
> >>>> in
> >>>>
> >>>> my example above, the batching argument for committed() also
> >>>>
> >>>> applies to
> >>>>
> >>>> position() since one purpose of fetching a partition's offset is
> >>>>
> >>>> to use
> >>>>
> >>>> it
> >>>>
> >>>> to set the position of the consumer to that offset. Since that
> >>>>
> >>>> might
> >>>>
> >>>> lead
> >>>>
> >>>> to a remote OffsetRequest call, I think we probably would be
> >>>>
> >>>> better off batching it.
> >>>>
> >>>>
> >>>>
> >>>> Another option for naming would be position/reposition instead of
> >>>>
> >>>> position/seek.
> >>>>
> >>>>
> >>>>
> >>>> I think position/seek is better since it aligns with Java file APIs.
> >>>>
> >>>>
> >>>>
> >>>> I also think your suggestion about ConsumerPosition makes sense.
> >>>>
> >>>>
> >>>>
> >>>> Thanks,
> >>>>
> >>>> Neha
> >>>>
> >>>> On Feb 13, 2014 9:22 PM, "Jay Kreps" <jay.kr...@gmail.com<mailto:
> >>>> jay.kr...@gmail.com><mailto:
> >>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto:
> >>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:
> >>>> jay.kr...@gmail.com>><mailto:
> >>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:
> >>>> jay.kr...@gmail.com><mailto:jay.kreps@gmail
> >>>> .com>>>
> >>>> wrote:
> >>>>
> >>>>
> >>>>
> >>>> Hey Neha,
> >>>>
> >>>>
> >>>>
> >>>> I actually wasn't proposing the name TopicOffsetPosition, that
> >>>>
> >>>> was
> >>>>
> >>>> just a
> >>>>
> >>>> typo. I meant TopicPartitionOffset, and I was just referencing
> >>>>
> >>>> what
> >>>>
> >>>> was
> >>>>
> >>>> in
> >>>>
> >>>> the javadoc. So to restate my proposal without the typo, using
> >>>>
> >>>> just
> >>>>
> >>>> the
> >>>>
> >>>> existing classes (that naming is a separate question):
> >>>>
> >>>> long position(TopicPartition tp)
> >>>>
> >>>> void seek(TopicPartitionOffset p)
> >>>>
> >>>> long committed(TopicPartition tp)
> >>>>
> >>>> void commit(TopicPartitionOffset...);
> >>>>
> >>>>
> >>>>
> >>>> So I may be unclear on committed() (AKA lastCommittedOffset). Is
> >>>>
> >>>> it returning the in-memory value from the last commit by this
> >>>>
> >>>> consumer,
> >>>>
> >>>> or
> >>>>
> >>>> is
> >>>>
> >>>> it doing a remote fetch, or both? I think you are saying both, i.e.
> >>>>
> >>>> if
> >>>>
> >>>> you
> >>>>
> >>>> have committed on a partition it returns you that value but if
> >>>>
> >>>> you
> >>>>
> >>>> haven't
> >>>>
> >>>> it does a remote lookup?
> >>>>
> >>>>
> >>>>
> >>>> The other argument for making committed batched is that commit()
> >>>>
> >>>> is batched, so there is symmetry.
> >>>>
> >>>>
> >>>>
> >>>> position() and seek() are always in memory changes (I assume) so
> >>>>
> >>>> there
> >>>>
> >>>> is
> >>>>
> >>>> no need to batch them.
> >>>>
> >>>>
> >>>>
> >>>> So taking all that into account what if we revise it to
> >>>>
> >>>> long position(TopicPartition tp)
> >>>>
> >>>> void seek(TopicPartitionOffset p)
> >>>>
> >>>> Map<TopicPartition, Long> committed(TopicPartition tp);
> >>>>
> >>>> void commit(TopicPartitionOffset...);
> >>>>
> >>>>
> >>>>
> >>>> This is not symmetric between position/seek and commit/committed
> >>>>
> >>>> but
> >>>>
> >>>> it
> >>>>
> >>>> is
> >>>>
> >>>> convenient. Another option for naming would be
> >>>>
> >>>> position/reposition
> >>>>
> >>>> instead
> >>>>
> >>>> of position/seek.
> >>>>
> >>>>
> >>>>
> >>>> With respect to the name TopicPartitionOffset, what I was trying
> >>>>
> >>>> to
> >>>>
> >>>> say
> >>>>
> >>>> is
> >>>>
> >>>> that I recommend we change that to something shorter. I think
> >>>>
> >>>> TopicPosition
> >>>>
> >>>> or ConsumerPosition might be better. Position does not refer to
> >>>>
> >>>> the variables in the object, it refers to the meaning of the
> >>>>
> >>>> object--it represents a position within a topic. The offset
> >>>>
> >>>> field in that object
> >>>>
> >>>> is
> >>>>
> >>>> still called the offset. TopicOffset, PartitionOffset, or
> >>>>
> >>>> ConsumerOffset
> >>>>
> >>>> would all be workable too. Basically I am just objecting to
> >>>>
> >>>> concatenating
> >>>>
> >>>> three nouns together. :-)
> >>>>
> >>>>
> >>>>
> >>>> -Jay
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> On Thu, Feb 13, 2014 at 1:54 PM, Neha Narkhede <
> >>>>
> >>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto:
> >>>> neha.narkh...@gmail.com><mailto:
> >>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>><mailto:
> >>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto:
> >>>> neha.narkh...@gmail.com>>
> >>>>
> >>>> wrote:
> >>>>
> >>>>
> >>>>
> >>>> 2. It returns a list of results. But how can you use the list?
> >>>>
> >>>> The
> >>>>
> >>>> only
> >>>>
> >>>> way
> >>>>
> >>>> to use the list is to make a map of tp=>offset and then look
> >>>>
> >>>> up
> >>>>
> >>>> results
> >>>>
> >>>> in
> >>>>
> >>>> this map (or do a for loop over the list for the partition you
> >>>>
> >>>> want). I
> >>>>
> >>>> recommend that if this is an in-memory check we just do one at
> >>>>
> >>>> a
> >>>>
> >>>> time.
> >>>>
> >>>> E.g.
> >>>>
> >>>> long committedPosition(
> >>>>
> >>>> TopicPosition).
> >>>>
> >>>>
> >>>>
> >>>> This was discussed in the previous emails. There is a choic
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> Robert Withers
> >>>> robert.with...@dish.com<mailto:robert.with...@dish.com><mailto:
> >>>> robert.with...@dish.com><mailto:
> >>>> robert.with...@dish.com<mailto:robert.with...@dish.com>>
> >>>> c: 303.919.5856
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> Robert Withers
> >>>> robert.with...@dish.com<mailto:robert.with...@dish.com><mailto:
> >>>> robert.with...@dish.com>
> >>>> c: 303.919.5856
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> -- Guozhang
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> Robert Withers
> >>>> robert.with...@dish.com<mailto:robert.with...@dish.com>
> >>>> c: 303.919.5856
> >>>>
> >>>>
> >>>
> >>
> >
>

Reply via email to