Neha,

I see how one might wish to implement onPartitionsAssigned and 
onPartitionsRevoked, but I don’t have a sense for how I might supply these 
implementations to a running consumer.  What would the setup code look like to 
start a high-level consumer with these provided implementations?

thanks,
Rob


On Feb 27, 2014, at 3:48 AM, Neha Narkhede <neha.narkh...@gmail.com> wrote:

> Rob,
> 
> The use of the callbacks is explained in the javadoc here -
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerRebalanceCallback.html
> 
> Let me know if it makes sense. The hope is to improve the javadoc so that
> it is self explanatory.
> 
> Thanks,
> Neha
> 
> 
> On Wed, Feb 26, 2014 at 9:16 AM, Robert Withers
> <robert.w.with...@gmail.com>wrote:
> 
>> Neha, what does the use of the RebalanceBeginCallback and
>> RebalanceEndCallback look like?
>> 
>> thanks,
>> Rob
>> 
>> On Feb 25, 2014, at 3:51 PM, Neha Narkhede <neha.narkh...@gmail.com>
>> wrote:
>> 
>>> How do you know n? The whole point is that you need to be able to fetch
>> the
>>> end offset. You can't a priori decide you will load 1m messages without
>>> knowing what is there.
>>> 
>>> Hmm. I think what you are pointing out is that in the new consumer API,
>> we
>>> don't have a way to issue the equivalent of the existing
>> getOffsetsBefore()
>>> API. Agree that is a flaw that we should fix.
>>> 
>>> Will update the docs/wiki with a few use cases that I've collected so far
>>> and see if the API covers those.
>>> 
>>> I would prefer PartitionsAssigned and PartitionsRevoked as that seems
>>> clearer to me
>>> 
>>> Well the RebalanceBeginCallback interface will have
>> onPartitionsAssigned()
>>> as the callback. Similarly, the RebalanceEndCallback interface will have
>>> onPartitionsRevoked() as the callback. Makes sense?
>>> 
>>> Thanks,
>>> Neha
>>> 
>>> 
>>> On Tue, Feb 25, 2014 at 2:38 PM, Jay Kreps <jay.kr...@gmail.com> wrote:
>>> 
>>>> 1. I would prefer PartitionsAssigned and PartitionsRevoked as that seems
>>>> clearer to me.
>>>> 
>>>> -Jay
>>>> 
>>>> 
>>>> On Tue, Feb 25, 2014 at 10:19 AM, Neha Narkhede <
>> neha.narkh...@gmail.com
>>>>> wrote:
>>>> 
>>>>> Thanks for the reviews so far! There are a few outstanding questions -
>>>>> 
>>>>> 1.  It will be good to make the rebalance callbacks forward compatible
>>>> with
>>>>> Java 8 capabilities. We can change it to PartitionsAssignedCallback
>>>>> and PartitionsRevokedCallback or RebalanceBeginCallback and
>>>>> RebalanceEndCallback?
>>>>> 
>>>>> If there are no objections, I will change it to RebalanceBeginCallback
>>>> and
>>>>> RebalanceEndCallback.
>>>>> 
>>>>> 2.  The return type for committed() is List<TopicPartitionOffset>.
>> There
>>>>> was a suggestion to change it to either be Map<TopicPartition,Long> or
>>>>> Map<TopicPartition, TopicPartitionOffset>
>>>>> 
>>>>> Do people have feedback on this suggestion?
>>>>> 
>>>>> 
>>>>> On Tue, Feb 25, 2014 at 9:56 AM, Neha Narkhede <
>> neha.narkh...@gmail.com
>>>>>> wrote:
>>>>> 
>>>>>> Robert,
>>>>>> 
>>>>>> Are you saying it is possible to get events from the high-level
>>>>> consumerregarding various state machine changes?  For instance, can we
>>>> get a
>>>>>> notification when a rebalance starts and ends, when a partition is
>>>>>> assigned/unassigned, when an offset is committed on a partition, when
>> a
>>>>>> leader changes and so on?  I call this OOB traffic, since they are not
>>>>> the
>>>>>> core messages streaming, but side-band events, yet they are still
>>>>>> potentially useful to consumers.
>>>>>> 
>>>>>> In the current proposal, you get notified when the state machine
>>>> changes
>>>>>> i.e. before and after a rebalance is triggered. Look at
>>>>>> ConsumerRebalanceCallback<
>>>>> 
>>>> 
>> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerRebalanceCallback.html
>>>>>> 
>>>>>> .Leader changes do not count as state machine changes for consumer
>>>>>> rebalance purposes.
>>>>>> 
>>>>>> Thanks,
>>>>>> Neha
>>>>>> 
>>>>>> 
>>>>>> On Tue, Feb 25, 2014 at 9:54 AM, Neha Narkhede <
>>>> neha.narkh...@gmail.com
>>>>>> wrote:
>>>>>> 
>>>>>>> Jay/Robert -
>>>>>>> 
>>>>>>> 
>>>>>>> I think what Robert is saying is that we need to think through the
>>>>> offset
>>>>>>> API to enable "batch processing" of topic data. Think of a process
>>>> that
>>>>>>> periodically kicks off to compute a data summary or do a data load or
>>>>>>> something like that. I think what we need to support this is an api
>> to
>>>>>>> fetch the last offset from the server for a partition. Something like
>>>>>>>  long lastOffset(TopicPartition tp)
>>>>>>> and for symmetry
>>>>>>>  long firstOffset(TopicPartition tp)
>>>>>>> 
>>>>>>> Likely this would have to be batched.
>>>>>>> 
>>>>>>> A fixed range of data load can be done using the existing APIs as
>>>>>>> follows. This assumes you know the endOffset which can be
>>>> currentOffset
>>>>> + n
>>>>>>> (number of messages in the load)
>>>>>>> 
>>>>>>> long startOffset = consumer.position(partition);
>>>>>>> long endOffset = startOffset + n;
>>>>>>> while(consumer.position(partition) <= endOffset) {
>>>>>>>    List<ConsumerRecord> messages = consumer.poll(timeout,
>>>>>>> TimeUnit.MILLISECONDS);
>>>>>>>    process(messages, endOffset);          // processes messages
>>>> until
>>>>>>> endOffset
>>>>>>> }
>>>>>>> 
>>>>>>> Does that make sense?
>>>>>>> 
>>>>>>> 
>>>>>>> On Tue, Feb 25, 2014 at 9:49 AM, Neha Narkhede <
>>>> neha.narkh...@gmail.com
>>>>>> wrote:
>>>>>>> 
>>>>>>>> Thanks for the review, Jun. Here are some comments -
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 1. The using of ellipsis: This may make passing a list of items from
>>>> a
>>>>>>>> collection to the api a bit harder. Suppose that you have a list of
>>>>>>>> topics
>>>>>>>> stored in
>>>>>>>> 
>>>>>>>> ArrayList<String> topics;
>>>>>>>> 
>>>>>>>> If you want subscribe to all topics in one call, you will have to
>> do:
>>>>>>>> 
>>>>>>>> String[] topicArray = new String[topics.size()];
>>>>>>>> consumer.subscribe(topics.
>>>>>>>> toArray(topicArray));
>>>>>>>> 
>>>>>>>> A similar argument can be made for arguably the more common use case
>>>> of
>>>>>>>> subscribing to a single topic as well. In these cases, user is
>>>> required
>>>>>>>> to write more
>>>>>>>> code to create a single item collection and pass it in. Since
>>>>>>>> subscription is extremely lightweight
>>>>>>>> invoking it multiple times also seems like a workable solution, no?
>>>>>>>> 
>>>>>>>> 2. It would be good to document that the following apis are mutually
>>>>>>>> exclusive. Also, if the partition level subscription is specified,
>>>>> there
>>>>>>>> is
>>>>>>>> no group management. Finally, unsubscribe() can only be used to
>>>> cancel
>>>>>>>> subscriptions with the same pattern. For example, you can't
>>>> unsubscribe
>>>>>>>> at
>>>>>>>> the partition level if the subscription is done at the topic level.
>>>>>>>> 
>>>>>>>> *subscribe*(java.lang.String... topics)
>>>>>>>> *subscribe*(java.lang.String topic, int... partitions)
>>>>>>>> 
>>>>>>>> Makes sense. Made the suggested improvements to the docs<
>>>>> 
>>>> 
>> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/Consumer.html#subscribe%28java.lang.String...%29
>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 3.commit(): The following comment in the doc should probably say
>>>>> "commit
>>>>>>>> offsets for partitions assigned to this consumer".
>>>>>>>> 
>>>>>>>> If no partitions are specified, commits offsets for the subscribed
>>>>> list
>>>>>>>> of
>>>>>>>> topics and partitions to Kafka.
>>>>>>>> 
>>>>>>>> Could you give more context on this suggestion? Here is the entire
>>>> doc
>>>>> -
>>>>>>>> 
>>>>>>>> Synchronously commits the specified offsets for the specified list
>> of
>>>>>>>> topics and partitions to *Kafka*. If no partitions are specified,
>>>>>>>> commits offsets for the subscribed list of topics and partitions.
>>>>>>>> 
>>>>>>>> The hope is to convey that if no partitions are specified, offsets
>>>> will
>>>>>>>> be committed for the subscribed list of partitions. One improvement
>>>>> could
>>>>>>>> be to
>>>>>>>> explicitly state that the offsets returned on the last poll will be
>>>>>>>> committed. I updated this to -
>>>>>>>> 
>>>>>>>> Synchronously commits the specified offsets for the specified list
>> of
>>>>>>>> topics and partitions to *Kafka*. If no offsets are specified,
>>>> commits
>>>>>>>> offsets returned on the last {@link #poll(long, TimeUnit) poll()}
>> for
>>>>>>>> the subscribed list of topics and partitions.
>>>>>>>> 
>>>>>>>> 4. There is inconsistency in specifying partitions. Sometimes we use
>>>>>>>> TopicPartition and some other times we use String and int (see
>>>>>>>> examples below).
>>>>>>>> 
>>>>>>>> void onPartitionsAssigned(Consumer consumer,
>>>>>>>> TopicPartition...partitions)
>>>>>>>> 
>>>>>>>> public void *subscribe*(java.lang.String topic, int... partitions)
>>>>>>>> 
>>>>>>>> Yes, this was discussed previously. I think generally the consensus
>>>>>>>> seems to be to use the higher level
>>>>>>>> classes everywhere. Made those changes.
>>>>>>>> 
>>>>>>>> What's the use case of position()? Isn't that just the nextOffset()
>>>> on
>>>>>>>> the
>>>>>>>> last message returned from poll()?
>>>>>>>> 
>>>>>>>> Yes, except in the case where a rebalance is triggered and poll() is
>>>>> not
>>>>>>>> yet invoked. Here, you would use position() to get the new fetch
>>>>> position
>>>>>>>> for the specific partition. Even if this is not a common use case,
>>>> IMO
>>>>> it
>>>>>>>> is much easier to use position() to get the fetch offset than
>>>> invoking
>>>>>>>> nextOffset() on the last message. This also keeps the APIs
>> symmetric,
>>>>> which
>>>>>>>> is nice.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Mon, Feb 24, 2014 at 7:06 PM, Withers, Robert <
>>>>>>>> robert.with...@dish.com> wrote:
>>>>>>>> 
>>>>>>>>> That's wonderful.  Thanks for kafka.
>>>>>>>>> 
>>>>>>>>> Rob
>>>>>>>>> 
>>>>>>>>> On Feb 24, 2014, at 9:58 AM, Guozhang Wang <wangg...@gmail.com
>>>>> <mailto:
>>>>>>>>> wangg...@gmail.com>> wrote:
>>>>>>>>> 
>>>>>>>>> Hi Robert,
>>>>>>>>> 
>>>>>>>>> Yes, you can check out the callback functions in the new API
>>>>>>>>> 
>>>>>>>>> onPartitionDesigned
>>>>>>>>> onPartitionAssigned
>>>>>>>>> 
>>>>>>>>> and see if they meet your needs.
>>>>>>>>> 
>>>>>>>>> Guozhang
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert <
>>>>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com>>wrote:
>>>>>>>>> 
>>>>>>>>> Jun,
>>>>>>>>> 
>>>>>>>>> Are you saying it is possible to get events from the high-level
>>>>> consumer
>>>>>>>>> regarding various state machine changes?  For instance, can we get
>> a
>>>>>>>>> notification when a rebalance starts and ends, when a partition is
>>>>>>>>> assigned/unassigned, when an offset is committed on a partition,
>>>> when
>>>>> a
>>>>>>>>> leader changes and so on?  I call this OOB traffic, since they are
>>>> not
>>>>>>>>> the
>>>>>>>>> core messages streaming, but side-band events, yet they are still
>>>>>>>>> potentially useful to consumers.
>>>>>>>>> 
>>>>>>>>> Thank you,
>>>>>>>>> Robert
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Robert Withers
>>>>>>>>> Staff Analyst/Developer
>>>>>>>>> o: (720) 514-8963
>>>>>>>>> c:  (571) 262-1873
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> -----Original Message-----
>>>>>>>>> From: Jun Rao [mailto:jun...@gmail.com]
>>>>>>>>> Sent: Sunday, February 23, 2014 4:19 PM
>>>>>>>>> To: users@kafka.apache.org<mailto:users@kafka.apache.org>
>>>>>>>>> Subject: Re: New Consumer API discussion
>>>>>>>>> 
>>>>>>>>> Robert,
>>>>>>>>> 
>>>>>>>>> For the push orient api, you can potentially implement your own
>>>>>>>>> MessageHandler with those methods. In the main loop of our new
>>>>> consumer
>>>>>>>>> api, you can just call those methods based on the events you get.
>>>>>>>>> 
>>>>>>>>> Also, we already have an api to get the first and the last offset
>>>> of a
>>>>>>>>> partition (getOffsetBefore).
>>>>>>>>> 
>>>>>>>>> Thanks,
>>>>>>>>> 
>>>>>>>>> Jun
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert
>>>>>>>>> <robert.with...@dish.com<mailto:robert.with...@dish.com>>wrote:
>>>>>>>>> 
>>>>>>>>> This is a good idea, too.  I would modify it to include stream
>>>>>>>>> marking, then you can have:
>>>>>>>>> 
>>>>>>>>> long end = consumer.lastOffset(tp);
>>>>>>>>> consumer.setMark(end);
>>>>>>>>> while(consumer.beforeMark()) {
>>>>>>>>> process(consumer.pollToMark());
>>>>>>>>> }
>>>>>>>>> 
>>>>>>>>> or
>>>>>>>>> 
>>>>>>>>> long end = consumer.lastOffset(tp);
>>>>>>>>> consumer.setMark(end);
>>>>>>>>> for(Object msg : consumer.iteratorToMark()) {
>>>>>>>>> process(msg);
>>>>>>>>> }
>>>>>>>>> 
>>>>>>>>> I actually have 4 suggestions, then:
>>>>>>>>> 
>>>>>>>>> *   pull: stream marking
>>>>>>>>> *   pull: finite streams, bound by time range (up-to-now,
>> yesterday)
>>>>> or
>>>>>>>>> offset
>>>>>>>>> *   pull: async api
>>>>>>>>> *   push: KafkaMessageSource, for a push model, with msg and OOB
>>>>> events.
>>>>>>>>> Build one in either individual or chunk mode and have a listener
>> for
>>>>>>>>> each msg or a listener for a chunk of msgs.  Make it composable and
>>>>>>>>> policy driven (chunked, range, commitOffsets policy, retry policy,
>>>>>>>>> transactional)
>>>>>>>>> 
>>>>>>>>> Thank you,
>>>>>>>>> Robert
>>>>>>>>> 
>>>>>>>>> On Feb 22, 2014, at 11:21 AM, Jay Kreps <jay.kr...@gmail.com
>>>> <mailto:
>>>>>>>>> jay.kr...@gmail.com><mailto:
>>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>>> wrote:
>>>>>>>>> 
>>>>>>>>> I think what Robert is saying is that we need to think through the
>>>>>>>>> offset API to enable "batch processing" of topic data. Think of a
>>>>>>>>> process that periodically kicks off to compute a data summary or do
>>>> a
>>>>>>>>> data load or something like that. I think what we need to support
>>>> this
>>>>>>>>> is an api to fetch the last offset from the server for a partition.
>>>>>>>>> Something like
>>>>>>>>> long lastOffset(TopicPartition tp)
>>>>>>>>> and for symmetry
>>>>>>>>> long firstOffset(TopicPartition tp)
>>>>>>>>> 
>>>>>>>>> Likely this would have to be batched. Essentially we should add
>> this
>>>>>>>>> use case to our set of code examples to write and think through.
>>>>>>>>> 
>>>>>>>>> The usage would be something like
>>>>>>>>> 
>>>>>>>>> long end = consumer.lastOffset(tp);
>>>>>>>>> while(consumer.position < end)
>>>>>>>>> process(consumer.poll());
>>>>>>>>> 
>>>>>>>>> -Jay
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert
>>>>>>>>> <robert.with...@dish.com<mailto:robert.with...@dish.com>
>>>>>>>>> <mailto:robert.with...@dish.com>>wrote:
>>>>>>>>> 
>>>>>>>>> Jun,
>>>>>>>>> 
>>>>>>>>> I was originally thinking a non-blocking read from a distributed
>>>>>>>>> stream should distinguish between "no local messages, but a fetch
>> is
>>>>>>>>> occurring"
>>>>>>>>> versus "you have drained the stream".  The reason this may be
>>>> valuable
>>>>>>>>> to me is so I can write consumers that read all known traffic then
>>>>>>>>> terminate.
>>>>>>>>> You caused me to reconsider and I think I am conflating 2 things.
>>>> One
>>>>>>>>> is a sync/async api while the other is whether to have an infinite
>>>> or
>>>>>>>>> finite stream.  Is it possible to build a finite KafkaStream on a
>>>>>>>>> range of messages?
>>>>>>>>> 
>>>>>>>>> Perhaps a Simple Consumer would do just fine and then I could start
>>>>>>>>> off getting the writeOffset from zookeeper and tell it to read a
>>>>>>>>> specified range per partition.  I've done this and forked a simple
>>>>>>>>> consumer runnable for each partition, for one of our analyzers.
>> The
>>>>>>>>> great thing about the high-level consumer is that rebalance, so I
>>>> can
>>>>>>>>> fork however many stream readers I want and you just figure it out
>>>> for
>>>>>>>>> me.  In that way you offer us the control over the resource
>>>>>>>>> consumption within a pull model.  This is best to regulate message
>>>>>>>>> pressure, they say.
>>>>>>>>> 
>>>>>>>>> Combining that high-level rebalance ability with a ranged partition
>>>>>>>>> drain could be really nice...build the stream with an ending
>>>> position
>>>>>>>>> and it is a finite stream, but retain the high-level rebalance.
>>>> With
>>>>>>>>> a finite stream, you would know the difference of the 2 async
>>>>>>>>> scenarios: fetch-in-progress versus end-of-stream.  With an
>> infinite
>>>>>>>>> stream, you never get end-of-stream.
>>>>>>>>> 
>>>>>>>>> Aside from a high-level consumer over a finite range within each
>>>>>>>>> partition, the other feature I can think of is more complicated.  A
>>>>>>>>> high-level consumer has state machine changes that the client
>> cannot
>>>>>>>>> access, to my knowledge.  Our use of kafka has us invoke a message
>>>>>>>>> handler with each message we consumer from the KafkaStream, so we
>>>>>>>>> convert a pull-model to a push-model.  Including the idea of
>>>> receiving
>>>>>>>>> notifications from state machine changes, what would be really nice
>>>> is
>>>>>>>>> to have a KafkaMessageSource, that is an eventful push model.  If
>> it
>>>>>>>>> were thread-safe, then we could register listeners for various
>>>> events:
>>>>>>>>> 
>>>>>>>>> *   opening-stream
>>>>>>>>> *   closing-stream
>>>>>>>>> *   message-arrived
>>>>>>>>> *   end-of-stream/no-more-messages-in-partition (for finite
>> streams)
>>>>>>>>> *   rebalance started
>>>>>>>>> *   partition assigned
>>>>>>>>> *   partition unassigned
>>>>>>>>> *   rebalance finished
>>>>>>>>> *   partition-offset-committed
>>>>>>>>> 
>>>>>>>>> Perhaps that is just our use, but instead of a pull-oriented
>>>>>>>>> KafkaStream, is there any sense in your providing a push-oriented
>>>>>>>>> KafkaMessageSource publishing OOB messages?
>>>>>>>>> 
>>>>>>>>> thank you,
>>>>>>>>> Robert
>>>>>>>>> 
>>>>>>>>> On Feb 21, 2014, at 5:59 PM, Jun Rao <jun...@gmail.com<mailto:
>>>>>>>>> jun...@gmail.com><mailto:
>>>>>>>>> jun...@gmail.com<mailto:jun...@gmail.com>><mailto:
>>>>>>>>> jun...@gmail.com<mailto:jun...@gmail.com><mailto:jun...@gmail.com
>>>>>>> 
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>> Robert,
>>>>>>>>> 
>>>>>>>>> Could you explain why you want to distinguish btw
>>>>>>>>> FetchingInProgressException and NoMessagePendingException? The
>>>>>>>>> nextMsgs() method that you want is exactly what poll() does.
>>>>>>>>> 
>>>>>>>>> Thanks,
>>>>>>>>> 
>>>>>>>>> Jun
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On Wed, Feb 19, 2014 at 8:45 AM, Withers, Robert
>>>>>>>>> <robert.with...@dish.com<mailto:robert.with...@dish.com> <mailto:
>>>>>>>>> robert.with...@dish.com>
>>>>>>>>> <mailto:robert.with...@dish.com>>wrote:
>>>>>>>>> 
>>>>>>>>> I am not clear on why the consumer stream should be positionable,
>>>>>>>>> especially if it is limited to the in-memory fetched messages.
>>>> Could
>>>>>>>>> someone explain to me, please?  I really like the idea of
>> committing
>>>>>>>>> the offset specifically on those partitions with changed read
>>>> offsets,
>>>>>>>>> only.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 2 items I would like to see added to the KafkaStream are:
>>>>>>>>> 
>>>>>>>>> *         a non-blocking next(), throws several exceptions
>>>>>>>>> (FetchingInProgressException and a NoMessagePendingException or
>>>>>>>>> something) to differentiate between fetching or no messages left.
>>>>>>>>> 
>>>>>>>>> *         A nextMsgs() method which returns all locally available
>>>>>>>>> messages
>>>>>>>>> and kicks off a fetch for the next chunk.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> If you are trying to add transactional features, then formally
>>>> define
>>>>>>>>> a DTP capability and pull in other server frameworks to share the
>>>>>>>>> implementation.  Should it be XA/Open?  How about a new peer2peer
>>>> DTP
>>>>>>>>> protocol?
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Thank you,
>>>>>>>>> 
>>>>>>>>> Robert
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Robert Withers
>>>>>>>>> 
>>>>>>>>> Staff Analyst/Developer
>>>>>>>>> 
>>>>>>>>> o: (720) 514-8963
>>>>>>>>> 
>>>>>>>>> c:  (571) 262-1873
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> -----Original Message-----
>>>>>>>>> From: Jay Kreps [mailto:jay.kr...@gmail.com]
>>>>>>>>> Sent: Sunday, February 16, 2014 10:13 AM
>>>>>>>>> To: users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:
>>>>>>>>> users@kafka.apache.org><mailto:
>>>>>>>>> users@kafka.apache.org<mailto:users@kafka.apache.org>>
>>>>>>>>> Subject: Re: New Consumer API discussion
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> +1 I think those are good. It is a little weird that changing the
>>>>>>>>> +fetch
>>>>>>>>> 
>>>>>>>>> point is not batched but changing the commit point is, but I
>> suppose
>>>>>>>>> there is no helping that.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> -Jay
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On Sat, Feb 15, 2014 at 7:52 AM, Neha Narkhede
>>>>>>>>> <neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com> <mailto:
>>>>>>>>> neha.narkh...@gmail.com>
>>>>>>>>> <mailto:neha.narkh...@gmail.com>
>>>>>>>>> <mailto:neha.narkh...@gmail.com>>wrote:
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Jay,
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> That makes sense. position/seek deal with changing the consumers
>>>>>>>>> 
>>>>>>>>> in-memory data, so there is no remote rpc there. For some reason, I
>>>>>>>>> 
>>>>>>>>> got committed and seek mixed up in my head at that time :)
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> So we still end up with
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> long position(TopicPartition tp)
>>>>>>>>> 
>>>>>>>>> void seek(TopicPartitionOffset p)
>>>>>>>>> 
>>>>>>>>> Map<TopicPartition, Long> committed(TopicPartition tp);
>>>>>>>>> 
>>>>>>>>> void commit(TopicPartitionOffset...);
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Thanks,
>>>>>>>>> 
>>>>>>>>> Neha
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On Friday, February 14, 2014, Jay Kreps <jay.kr...@gmail.com
>>>> <mailto:
>>>>>>>>> jay.kr...@gmail.com><mailto:
>>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto:
>>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:
>>>>>>>>> jay.kr...@gmail.com>><mailto:
>>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:
>>>>>>>>> jay.kr...@gmail.com><mailto:jay.kreps@gmail
>>>>>>>>> .com>>>
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Oh, interesting. So I am assuming the following implementation:
>>>>>>>>> 
>>>>>>>>> 1. We have an in-memory fetch position which controls the next
>> fetch
>>>>>>>>> 
>>>>>>>>> offset.
>>>>>>>>> 
>>>>>>>>> 2. Changing this has no effect until you poll again at which point
>>>>>>>>> 
>>>>>>>>> your fetch request will be from the newly specified offset 3. We
>>>>>>>>> 
>>>>>>>>> then have an in-memory but also remotely stored committed offset.
>>>>>>>>> 
>>>>>>>>> 4. Calling commit has the effect of saving the fetch position as
>>>>>>>>> 
>>>>>>>>> both the in memory committed position and in the remote store 5.
>>>>>>>>> 
>>>>>>>>> Auto-commit is the same as periodically calling commit on all
>>>>>>>>> 
>>>>>>>>> positions.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> So batching on commit as well as getting the committed position
>>>>>>>>> 
>>>>>>>>> makes sense, but batching the fetch position wouldn't, right? I
>>>>>>>>> 
>>>>>>>>> think you are actually thinking of a different approach.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> -Jay
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On Thu, Feb 13, 2014 at 10:40 PM, Neha Narkhede
>>>>>>>>> 
>>>>>>>>> <neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto:
>>>>>>>>> neha.narkh...@gmail.com><mailto:
>>>>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>>
>>>>>>>>> 
>>>>>>>>> <javascript:;>
>>>>>>>>> 
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> I think you are saying both, i.e. if you have committed on a
>>>>>>>>> 
>>>>>>>>> partition it returns you that value but if you
>>>>>>>>> 
>>>>>>>>> haven't
>>>>>>>>> 
>>>>>>>>> it does a remote lookup?
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Correct.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> The other argument for making committed batched is that commit()
>>>>>>>>> 
>>>>>>>>> is batched, so there is symmetry.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> position() and seek() are always in memory changes (I assume) so
>>>>>>>>> 
>>>>>>>>> there
>>>>>>>>> 
>>>>>>>>> is
>>>>>>>>> 
>>>>>>>>> no need to batch them.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> I'm not as sure as you are about that assumption being true.
>>>>>>>>> 
>>>>>>>>> Basically
>>>>>>>>> 
>>>>>>>>> in
>>>>>>>>> 
>>>>>>>>> my example above, the batching argument for committed() also
>>>>>>>>> 
>>>>>>>>> applies to
>>>>>>>>> 
>>>>>>>>> position() since one purpose of fetching a partition's offset is
>>>>>>>>> 
>>>>>>>>> to use
>>>>>>>>> 
>>>>>>>>> it
>>>>>>>>> 
>>>>>>>>> to set the position of the consumer to that offset. Since that
>>>>>>>>> 
>>>>>>>>> might
>>>>>>>>> 
>>>>>>>>> lead
>>>>>>>>> 
>>>>>>>>> to a remote OffsetRequest call, I think we probably would be
>>>>>>>>> 
>>>>>>>>> better off batching it.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Another option for naming would be position/reposition instead of
>>>>>>>>> 
>>>>>>>>> position/seek.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> I think position/seek is better since it aligns with Java file
>> APIs.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> I also think your suggestion about ConsumerPosition makes sense.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Thanks,
>>>>>>>>> 
>>>>>>>>> Neha
>>>>>>>>> 
>>>>>>>>> On Feb 13, 2014 9:22 PM, "Jay Kreps" <jay.kr...@gmail.com<mailto:
>>>>>>>>> jay.kr...@gmail.com><mailto:
>>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto:
>>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:
>>>>>>>>> jay.kr...@gmail.com>><mailto:
>>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:
>>>>>>>>> jay.kr...@gmail.com><mailto:jay.kreps@gmail
>>>>>>>>> .com>>>
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Hey Neha,
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> I actually wasn't proposing the name TopicOffsetPosition, that
>>>>>>>>> 
>>>>>>>>> was
>>>>>>>>> 
>>>>>>>>> just a
>>>>>>>>> 
>>>>>>>>> typo. I meant TopicPartitionOffset, and I was just referencing
>>>>>>>>> 
>>>>>>>>> what
>>>>>>>>> 
>>>>>>>>> was
>>>>>>>>> 
>>>>>>>>> in
>>>>>>>>> 
>>>>>>>>> the javadoc. So to restate my proposal without the typo, using
>>>>>>>>> 
>>>>>>>>> just
>>>>>>>>> 
>>>>>>>>> the
>>>>>>>>> 
>>>>>>>>> existing classes (that naming is a separate question):
>>>>>>>>> 
>>>>>>>>> long position(TopicPartition tp)
>>>>>>>>> 
>>>>>>>>> void seek(TopicPartitionOffset p)
>>>>>>>>> 
>>>>>>>>> long committed(TopicPartition tp)
>>>>>>>>> 
>>>>>>>>> void commit(TopicPartitionOffset...);
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> So I may be unclear on committed() (AKA lastCommittedOffset). Is
>>>>>>>>> 
>>>>>>>>> it returning the in-memory value from the last commit by this
>>>>>>>>> 
>>>>>>>>> consumer,
>>>>>>>>> 
>>>>>>>>> or
>>>>>>>>> 
>>>>>>>>> is
>>>>>>>>> 
>>>>>>>>> it doing a remote fetch, or both? I think you are saying both, i.e.
>>>>>>>>> 
>>>>>>>>> if
>>>>>>>>> 
>>>>>>>>> you
>>>>>>>>> 
>>>>>>>>> have committed on a partition it returns you that value but if
>>>>>>>>> 
>>>>>>>>> you
>>>>>>>>> 
>>>>>>>>> haven't
>>>>>>>>> 
>>>>>>>>> it does a remote lookup?
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> The other argument for making committed batched is that commit()
>>>>>>>>> 
>>>>>>>>> is batched, so there is symmetry.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> position() and seek() are always in memory changes (I assume) so
>>>>>>>>> 
>>>>>>>>> there
>>>>>>>>> 
>>>>>>>>> is
>>>>>>>>> 
>>>>>>>>> no need to batch them.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> So taking all that into account what if we revise it to
>>>>>>>>> 
>>>>>>>>> long position(TopicPartition tp)
>>>>>>>>> 
>>>>>>>>> void seek(TopicPartitionOffset p)
>>>>>>>>> 
>>>>>>>>> Map<TopicPartition, Long> committed(TopicPartition tp);
>>>>>>>>> 
>>>>>>>>> void commit(TopicPartitionOffset...);
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> This is not symmetric between position/seek and commit/committed
>>>>>>>>> 
>>>>>>>>> but
>>>>>>>>> 
>>>>>>>>> it
>>>>>>>>> 
>>>>>>>>> is
>>>>>>>>> 
>>>>>>>>> convenient. Another option for naming would be
>>>>>>>>> 
>>>>>>>>> position/reposition
>>>>>>>>> 
>>>>>>>>> instead
>>>>>>>>> 
>>>>>>>>> of position/seek.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> With respect to the name TopicPartitionOffset, what I was trying
>>>>>>>>> 
>>>>>>>>> to
>>>>>>>>> 
>>>>>>>>> say
>>>>>>>>> 
>>>>>>>>> is
>>>>>>>>> 
>>>>>>>>> that I recommend we change that to something shorter. I think
>>>>>>>>> 
>>>>>>>>> TopicPosition
>>>>>>>>> 
>>>>>>>>> or ConsumerPosition might be better. Position does not refer to
>>>>>>>>> 
>>>>>>>>> the variables in the object, it refers to the meaning of the
>>>>>>>>> 
>>>>>>>>> object--it represents a position within a topic. The offset
>>>>>>>>> 
>>>>>>>>> field in that object
>>>>>>>>> 
>>>>>>>>> is
>>>>>>>>> 
>>>>>>>>> still called the offset. TopicOffset, PartitionOffset, or
>>>>>>>>> 
>>>>>>>>> ConsumerOffset
>>>>>>>>> 
>>>>>>>>> would all be workable too. Basically I am just objecting to
>>>>>>>>> 
>>>>>>>>> concatenating
>>>>>>>>> 
>>>>>>>>> three nouns together. :-)
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> -Jay
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On Thu, Feb 13, 2014 at 1:54 PM, Neha Narkhede <
>>>>>>>>> 
>>>>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto:
>>>>>>>>> neha.narkh...@gmail.com><mailto:
>>>>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>><mailto:
>>>>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto:
>>>>>>>>> neha.narkh...@gmail.com>>
>>>>>>>>> 
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 2. It returns a list of results. But how can you use the list?
>>>>>>>>> 
>>>>>>>>> The
>>>>>>>>> 
>>>>>>>>> only
>>>>>>>>> 
>>>>>>>>> way
>>>>>>>>> 
>>>>>>>>> to use the list is to make a map of tp=>offset and then look
>>>>>>>>> 
>>>>>>>>> up
>>>>>>>>> 
>>>>>>>>> results
>>>>>>>>> 
>>>>>>>>> in
>>>>>>>>> 
>>>>>>>>> this map (or do a for loop over the list for the partition you
>>>>>>>>> 
>>>>>>>>> want). I
>>>>>>>>> 
>>>>>>>>> recommend that if this is an in-memory check we just do one at
>>>>>>>>> 
>>>>>>>>> a
>>>>>>>>> 
>>>>>>>>> time.
>>>>>>>>> 
>>>>>>>>> E.g.
>>>>>>>>> 
>>>>>>>>> long committedPosition(
>>>>>>>>> 
>>>>>>>>> TopicPosition).
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> This was discussed in the previous emails. There is a choic
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> --
>>>>>>>>> Robert Withers
>>>>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com><mailto:
>>>>>>>>> robert.with...@dish.com><mailto:
>>>>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com>>
>>>>>>>>> c: 303.919.5856
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> --
>>>>>>>>> Robert Withers
>>>>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com><mailto:
>>>>>>>>> robert.with...@dish.com>
>>>>>>>>> c: 303.919.5856
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> --
>>>>>>>>> -- Guozhang
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> --
>>>>>>>>> Robert Withers
>>>>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com>
>>>>>>>>> c: 303.919.5856
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> 
>> 


Reply via email to