Thank you, Neha, that makes it clear.  Really, the aspect of all this that we 
could really use is a way to do exactly once processing.  We are looking at 
more critical data.  What are the latest thoughts on how to achieve exactly 
once and how might that affect a consumer API?

Thanks,
Rob

On Feb 27, 2014, at 10:29 AM, Neha Narkhede <neha.narkh...@gmail.com> wrote:

> Is 
> this<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html#seek%28kafka.common.TopicPartitionOffset...%29>what
> you are looking for? Basically, I think from the overall feedback, it
> looks like code snippets don't seem to work for overall understanding of
> the APIs. I plan to update the javadoc with more complete examples that
> have been discussed so far on this thread and generally on the mailing list.
> 
> Thanks,
> Neha
> 
> 
> 
> 
> On Thu, Feb 27, 2014 at 4:17 AM, Robert Withers
> <robert.w.with...@gmail.com>wrote:
> 
>> Neha,
>> 
>> I see how one might wish to implement onPartitionsAssigned and
>> onPartitionsRevoked, but I don't have a sense for how I might supply these
>> implementations to a running consumer.  What would the setup code look like
>> to start a high-level consumer with these provided implementations?
>> 
>> thanks,
>> Rob
>> 
>> 
>> On Feb 27, 2014, at 3:48 AM, Neha Narkhede <neha.narkh...@gmail.com>
>> wrote:
>> 
>>> Rob,
>>> 
>>> The use of the callbacks is explained in the javadoc here -
>>> 
>> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerRebalanceCallback.html
>>> 
>>> Let me know if it makes sense. The hope is to improve the javadoc so that
>>> it is self explanatory.
>>> 
>>> Thanks,
>>> Neha
>>> 
>>> 
>>> On Wed, Feb 26, 2014 at 9:16 AM, Robert Withers
>>> <robert.w.with...@gmail.com>wrote:
>>> 
>>>> Neha, what does the use of the RebalanceBeginCallback and
>>>> RebalanceEndCallback look like?
>>>> 
>>>> thanks,
>>>> Rob
>>>> 
>>>> On Feb 25, 2014, at 3:51 PM, Neha Narkhede <neha.narkh...@gmail.com>
>>>> wrote:
>>>> 
>>>>> How do you know n? The whole point is that you need to be able to fetch
>>>> the
>>>>> end offset. You can't a priori decide you will load 1m messages without
>>>>> knowing what is there.
>>>>> 
>>>>> Hmm. I think what you are pointing out is that in the new consumer API,
>>>> we
>>>>> don't have a way to issue the equivalent of the existing
>>>> getOffsetsBefore()
>>>>> API. Agree that is a flaw that we should fix.
>>>>> 
>>>>> Will update the docs/wiki with a few use cases that I've collected so
>> far
>>>>> and see if the API covers those.
>>>>> 
>>>>> I would prefer PartitionsAssigned and PartitionsRevoked as that seems
>>>>> clearer to me
>>>>> 
>>>>> Well the RebalanceBeginCallback interface will have
>>>> onPartitionsAssigned()
>>>>> as the callback. Similarly, the RebalanceEndCallback interface will
>> have
>>>>> onPartitionsRevoked() as the callback. Makes sense?
>>>>> 
>>>>> Thanks,
>>>>> Neha
>>>>> 
>>>>> 
>>>>> On Tue, Feb 25, 2014 at 2:38 PM, Jay Kreps <jay.kr...@gmail.com>
>> wrote:
>>>>> 
>>>>>> 1. I would prefer PartitionsAssigned and PartitionsRevoked as that
>> seems
>>>>>> clearer to me.
>>>>>> 
>>>>>> -Jay
>>>>>> 
>>>>>> 
>>>>>> On Tue, Feb 25, 2014 at 10:19 AM, Neha Narkhede <
>>>> neha.narkh...@gmail.com
>>>>>>> wrote:
>>>>>> 
>>>>>>> Thanks for the reviews so far! There are a few outstanding questions
>> -
>>>>>>> 
>>>>>>> 1.  It will be good to make the rebalance callbacks forward
>> compatible
>>>>>> with
>>>>>>> Java 8 capabilities. We can change it to PartitionsAssignedCallback
>>>>>>> and PartitionsRevokedCallback or RebalanceBeginCallback and
>>>>>>> RebalanceEndCallback?
>>>>>>> 
>>>>>>> If there are no objections, I will change it to
>> RebalanceBeginCallback
>>>>>> and
>>>>>>> RebalanceEndCallback.
>>>>>>> 
>>>>>>> 2.  The return type for committed() is List<TopicPartitionOffset>.
>>>> There
>>>>>>> was a suggestion to change it to either be Map<TopicPartition,Long>
>> or
>>>>>>> Map<TopicPartition, TopicPartitionOffset>
>>>>>>> 
>>>>>>> Do people have feedback on this suggestion?
>>>>>>> 
>>>>>>> 
>>>>>>> On Tue, Feb 25, 2014 at 9:56 AM, Neha Narkhede <
>>>> neha.narkh...@gmail.com
>>>>>>>> wrote:
>>>>>>> 
>>>>>>>> Robert,
>>>>>>>> 
>>>>>>>> Are you saying it is possible to get events from the high-level
>>>>>>> consumerregarding various state machine changes?  For instance, can
>> we
>>>>>> get a
>>>>>>>> notification when a rebalance starts and ends, when a partition is
>>>>>>>> assigned/unassigned, when an offset is committed on a partition,
>> when
>>>> a
>>>>>>>> leader changes and so on?  I call this OOB traffic, since they are
>> not
>>>>>>> the
>>>>>>>> core messages streaming, but side-band events, yet they are still
>>>>>>>> potentially useful to consumers.
>>>>>>>> 
>>>>>>>> In the current proposal, you get notified when the state machine
>>>>>> changes
>>>>>>>> i.e. before and after a rebalance is triggered. Look at
>>>>>>>> ConsumerRebalanceCallback<
>>>>>>> 
>>>>>> 
>>>> 
>> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerRebalanceCallback.html
>>>>>>>> 
>>>>>>>> .Leader changes do not count as state machine changes for consumer
>>>>>>>> rebalance purposes.
>>>>>>>> 
>>>>>>>> Thanks,
>>>>>>>> Neha
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Tue, Feb 25, 2014 at 9:54 AM, Neha Narkhede <
>>>>>> neha.narkh...@gmail.com
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Jay/Robert -
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> I think what Robert is saying is that we need to think through the
>>>>>>> offset
>>>>>>>>> API to enable "batch processing" of topic data. Think of a process
>>>>>> that
>>>>>>>>> periodically kicks off to compute a data summary or do a data load
>> or
>>>>>>>>> something like that. I think what we need to support this is an api
>>>> to
>>>>>>>>> fetch the last offset from the server for a partition. Something
>> like
>>>>>>>>> long lastOffset(TopicPartition tp)
>>>>>>>>> and for symmetry
>>>>>>>>> long firstOffset(TopicPartition tp)
>>>>>>>>> 
>>>>>>>>> Likely this would have to be batched.
>>>>>>>>> 
>>>>>>>>> A fixed range of data load can be done using the existing APIs as
>>>>>>>>> follows. This assumes you know the endOffset which can be
>>>>>> currentOffset
>>>>>>> + n
>>>>>>>>> (number of messages in the load)
>>>>>>>>> 
>>>>>>>>> long startOffset = consumer.position(partition);
>>>>>>>>> long endOffset = startOffset + n;
>>>>>>>>> while(consumer.position(partition) <= endOffset) {
>>>>>>>>>   List<ConsumerRecord> messages = consumer.poll(timeout,
>>>>>>>>> TimeUnit.MILLISECONDS);
>>>>>>>>>   process(messages, endOffset);          // processes messages
>>>>>> until
>>>>>>>>> endOffset
>>>>>>>>> }
>>>>>>>>> 
>>>>>>>>> Does that make sense?
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On Tue, Feb 25, 2014 at 9:49 AM, Neha Narkhede <
>>>>>> neha.narkh...@gmail.com
>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Thanks for the review, Jun. Here are some comments -
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 1. The using of ellipsis: This may make passing a list of items
>> from
>>>>>> a
>>>>>>>>>> collection to the api a bit harder. Suppose that you have a list
>> of
>>>>>>>>>> topics
>>>>>>>>>> stored in
>>>>>>>>>> 
>>>>>>>>>> ArrayList<String> topics;
>>>>>>>>>> 
>>>>>>>>>> If you want subscribe to all topics in one call, you will have to
>>>> do:
>>>>>>>>>> 
>>>>>>>>>> String[] topicArray = new String[topics.size()];
>>>>>>>>>> consumer.subscribe(topics.
>>>>>>>>>> toArray(topicArray));
>>>>>>>>>> 
>>>>>>>>>> A similar argument can be made for arguably the more common use
>> case
>>>>>> of
>>>>>>>>>> subscribing to a single topic as well. In these cases, user is
>>>>>> required
>>>>>>>>>> to write more
>>>>>>>>>> code to create a single item collection and pass it in. Since
>>>>>>>>>> subscription is extremely lightweight
>>>>>>>>>> invoking it multiple times also seems like a workable solution,
>> no?
>>>>>>>>>> 
>>>>>>>>>> 2. It would be good to document that the following apis are
>> mutually
>>>>>>>>>> exclusive. Also, if the partition level subscription is specified,
>>>>>>> there
>>>>>>>>>> is
>>>>>>>>>> no group management. Finally, unsubscribe() can only be used to
>>>>>> cancel
>>>>>>>>>> subscriptions with the same pattern. For example, you can't
>>>>>> unsubscribe
>>>>>>>>>> at
>>>>>>>>>> the partition level if the subscription is done at the topic
>> level.
>>>>>>>>>> 
>>>>>>>>>> *subscribe*(java.lang.String... topics)
>>>>>>>>>> *subscribe*(java.lang.String topic, int... partitions)
>>>>>>>>>> 
>>>>>>>>>> Makes sense. Made the suggested improvements to the docs<
>>>>>>> 
>>>>>> 
>>>> 
>> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/Consumer.html#subscribe%28java.lang.String...%29
>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 3.commit(): The following comment in the doc should probably say
>>>>>>> "commit
>>>>>>>>>> offsets for partitions assigned to this consumer".
>>>>>>>>>> 
>>>>>>>>>> If no partitions are specified, commits offsets for the subscribed
>>>>>>> list
>>>>>>>>>> of
>>>>>>>>>> topics and partitions to Kafka.
>>>>>>>>>> 
>>>>>>>>>> Could you give more context on this suggestion? Here is the entire
>>>>>> doc
>>>>>>> -
>>>>>>>>>> 
>>>>>>>>>> Synchronously commits the specified offsets for the specified list
>>>> of
>>>>>>>>>> topics and partitions to *Kafka*. If no partitions are specified,
>>>>>>>>>> commits offsets for the subscribed list of topics and partitions.
>>>>>>>>>> 
>>>>>>>>>> The hope is to convey that if no partitions are specified, offsets
>>>>>> will
>>>>>>>>>> be committed for the subscribed list of partitions. One
>> improvement
>>>>>>> could
>>>>>>>>>> be to
>>>>>>>>>> explicitly state that the offsets returned on the last poll will
>> be
>>>>>>>>>> committed. I updated this to -
>>>>>>>>>> 
>>>>>>>>>> Synchronously commits the specified offsets for the specified list
>>>> of
>>>>>>>>>> topics and partitions to *Kafka*. If no offsets are specified,
>>>>>> commits
>>>>>>>>>> offsets returned on the last {@link #poll(long, TimeUnit) poll()}
>>>> for
>>>>>>>>>> the subscribed list of topics and partitions.
>>>>>>>>>> 
>>>>>>>>>> 4. There is inconsistency in specifying partitions. Sometimes we
>> use
>>>>>>>>>> TopicPartition and some other times we use String and int (see
>>>>>>>>>> examples below).
>>>>>>>>>> 
>>>>>>>>>> void onPartitionsAssigned(Consumer consumer,
>>>>>>>>>> TopicPartition...partitions)
>>>>>>>>>> 
>>>>>>>>>> public void *subscribe*(java.lang.String topic, int... partitions)
>>>>>>>>>> 
>>>>>>>>>> Yes, this was discussed previously. I think generally the
>> consensus
>>>>>>>>>> seems to be to use the higher level
>>>>>>>>>> classes everywhere. Made those changes.
>>>>>>>>>> 
>>>>>>>>>> What's the use case of position()? Isn't that just the
>> nextOffset()
>>>>>> on
>>>>>>>>>> the
>>>>>>>>>> last message returned from poll()?
>>>>>>>>>> 
>>>>>>>>>> Yes, except in the case where a rebalance is triggered and poll()
>> is
>>>>>>> not
>>>>>>>>>> yet invoked. Here, you would use position() to get the new fetch
>>>>>>> position
>>>>>>>>>> for the specific partition. Even if this is not a common use case,
>>>>>> IMO
>>>>>>> it
>>>>>>>>>> is much easier to use position() to get the fetch offset than
>>>>>> invoking
>>>>>>>>>> nextOffset() on the last message. This also keeps the APIs
>>>> symmetric,
>>>>>>> which
>>>>>>>>>> is nice.
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> On Mon, Feb 24, 2014 at 7:06 PM, Withers, Robert <
>>>>>>>>>> robert.with...@dish.com> wrote:
>>>>>>>>>> 
>>>>>>>>>>> That's wonderful.  Thanks for kafka.
>>>>>>>>>>> 
>>>>>>>>>>> Rob
>>>>>>>>>>> 
>>>>>>>>>>> On Feb 24, 2014, at 9:58 AM, Guozhang Wang <wangg...@gmail.com
>>>>>>> <mailto:
>>>>>>>>>>> wangg...@gmail.com>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>> Hi Robert,
>>>>>>>>>>> 
>>>>>>>>>>> Yes, you can check out the callback functions in the new API
>>>>>>>>>>> 
>>>>>>>>>>> onPartitionDesigned
>>>>>>>>>>> onPartitionAssigned
>>>>>>>>>>> 
>>>>>>>>>>> and see if they meet your needs.
>>>>>>>>>>> 
>>>>>>>>>>> Guozhang
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert <
>>>>>>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com>>wrote:
>>>>>>>>>>> 
>>>>>>>>>>> Jun,
>>>>>>>>>>> 
>>>>>>>>>>> Are you saying it is possible to get events from the high-level
>>>>>>> consumer
>>>>>>>>>>> regarding various state machine changes?  For instance, can we
>> get
>>>> a
>>>>>>>>>>> notification when a rebalance starts and ends, when a partition
>> is
>>>>>>>>>>> assigned/unassigned, when an offset is committed on a partition,
>>>>>> when
>>>>>>> a
>>>>>>>>>>> leader changes and so on?  I call this OOB traffic, since they
>> are
>>>>>> not
>>>>>>>>>>> the
>>>>>>>>>>> core messages streaming, but side-band events, yet they are still
>>>>>>>>>>> potentially useful to consumers.
>>>>>>>>>>> 
>>>>>>>>>>> Thank you,
>>>>>>>>>>> Robert
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> Robert Withers
>>>>>>>>>>> Staff Analyst/Developer
>>>>>>>>>>> o: (720) 514-8963
>>>>>>>>>>> c:  (571) 262-1873
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> -----Original Message-----
>>>>>>>>>>> From: Jun Rao [mailto:jun...@gmail.com]
>>>>>>>>>>> Sent: Sunday, February 23, 2014 4:19 PM
>>>>>>>>>>> To: users@kafka.apache.org<mailto:users@kafka.apache.org>
>>>>>>>>>>> Subject: Re: New Consumer API discussion
>>>>>>>>>>> 
>>>>>>>>>>> Robert,
>>>>>>>>>>> 
>>>>>>>>>>> For the push orient api, you can potentially implement your own
>>>>>>>>>>> MessageHandler with those methods. In the main loop of our new
>>>>>>> consumer
>>>>>>>>>>> api, you can just call those methods based on the events you get.
>>>>>>>>>>> 
>>>>>>>>>>> Also, we already have an api to get the first and the last offset
>>>>>> of a
>>>>>>>>>>> partition (getOffsetBefore).
>>>>>>>>>>> 
>>>>>>>>>>> Thanks,
>>>>>>>>>>> 
>>>>>>>>>>> Jun
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert
>>>>>>>>>>> <robert.with...@dish.com<mailto:robert.with...@dish.com>>wrote:
>>>>>>>>>>> 
>>>>>>>>>>> This is a good idea, too.  I would modify it to include stream
>>>>>>>>>>> marking, then you can have:
>>>>>>>>>>> 
>>>>>>>>>>> long end = consumer.lastOffset(tp);
>>>>>>>>>>> consumer.setMark(end);
>>>>>>>>>>> while(consumer.beforeMark()) {
>>>>>>>>>>> process(consumer.pollToMark());
>>>>>>>>>>> }
>>>>>>>>>>> 
>>>>>>>>>>> or
>>>>>>>>>>> 
>>>>>>>>>>> long end = consumer.lastOffset(tp);
>>>>>>>>>>> consumer.setMark(end);
>>>>>>>>>>> for(Object msg : consumer.iteratorToMark()) {
>>>>>>>>>>> process(msg);
>>>>>>>>>>> }
>>>>>>>>>>> 
>>>>>>>>>>> I actually have 4 suggestions, then:
>>>>>>>>>>> 
>>>>>>>>>>> *   pull: stream marking
>>>>>>>>>>> *   pull: finite streams, bound by time range (up-to-now,
>>>> yesterday)
>>>>>>> or
>>>>>>>>>>> offset
>>>>>>>>>>> *   pull: async api
>>>>>>>>>>> *   push: KafkaMessageSource, for a push model, with msg and OOB
>>>>>>> events.
>>>>>>>>>>> Build one in either individual or chunk mode and have a listener
>>>> for
>>>>>>>>>>> each msg or a listener for a chunk of msgs.  Make it composable
>> and
>>>>>>>>>>> policy driven (chunked, range, commitOffsets policy, retry
>> policy,
>>>>>>>>>>> transactional)
>>>>>>>>>>> 
>>>>>>>>>>> Thank you,
>>>>>>>>>>> Robert
>>>>>>>>>>> 
>>>>>>>>>>> On Feb 22, 2014, at 11:21 AM, Jay Kreps <jay.kr...@gmail.com
>>>>>> <mailto:
>>>>>>>>>>> jay.kr...@gmail.com><mailto:
>>>>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>> I think what Robert is saying is that we need to think through
>> the
>>>>>>>>>>> offset API to enable "batch processing" of topic data. Think of a
>>>>>>>>>>> process that periodically kicks off to compute a data summary or
>> do
>>>>>> a
>>>>>>>>>>> data load or something like that. I think what we need to support
>>>>>> this
>>>>>>>>>>> is an api to fetch the last offset from the server for a
>> partition.
>>>>>>>>>>> Something like
>>>>>>>>>>> long lastOffset(TopicPartition tp)
>>>>>>>>>>> and for symmetry
>>>>>>>>>>> long firstOffset(TopicPartition tp)
>>>>>>>>>>> 
>>>>>>>>>>> Likely this would have to be batched. Essentially we should add
>>>> this
>>>>>>>>>>> use case to our set of code examples to write and think through.
>>>>>>>>>>> 
>>>>>>>>>>> The usage would be something like
>>>>>>>>>>> 
>>>>>>>>>>> long end = consumer.lastOffset(tp);
>>>>>>>>>>> while(consumer.position < end)
>>>>>>>>>>> process(consumer.poll());
>>>>>>>>>>> 
>>>>>>>>>>> -Jay
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert
>>>>>>>>>>> <robert.with...@dish.com<mailto:robert.with...@dish.com>
>>>>>>>>>>> <mailto:robert.with...@dish.com>>wrote:
>>>>>>>>>>> 
>>>>>>>>>>> Jun,
>>>>>>>>>>> 
>>>>>>>>>>> I was originally thinking a non-blocking read from a distributed
>>>>>>>>>>> stream should distinguish between "no local messages, but a fetch
>>>> is
>>>>>>>>>>> occurring"
>>>>>>>>>>> versus "you have drained the stream".  The reason this may be
>>>>>> valuable
>>>>>>>>>>> to me is so I can write consumers that read all known traffic
>> then
>>>>>>>>>>> terminate.
>>>>>>>>>>> You caused me to reconsider and I think I am conflating 2 things.
>>>>>> One
>>>>>>>>>>> is a sync/async api while the other is whether to have an
>> infinite
>>>>>> or
>>>>>>>>>>> finite stream.  Is it possible to build a finite KafkaStream on a
>>>>>>>>>>> range of messages?
>>>>>>>>>>> 
>>>>>>>>>>> Perhaps a Simple Consumer would do just fine and then I could
>> start
>>>>>>>>>>> off getting the writeOffset from zookeeper and tell it to read a
>>>>>>>>>>> specified range per partition.  I've done this and forked a
>> simple
>>>>>>>>>>> consumer runnable for each partition, for one of our analyzers.
>>>> The
>>>>>>>>>>> great thing about the high-level consumer is that rebalance, so I
>>>>>> can
>>>>>>>>>>> fork however many stream readers I want and you just figure it
>> out
>>>>>> for
>>>>>>>>>>> me.  In that way you offer us the control over the resource
>>>>>>>>>>> consumption within a pull model.  This is best to regulate
>> message
>>>>>>>>>>> pressure, they say.
>>>>>>>>>>> 
>>>>>>>>>>> Combining that high-level rebalance ability with a ranged
>> partition
>>>>>>>>>>> drain could be really nice...build the stream with an ending
>>>>>> position
>>>>>>>>>>> and it is a finite stream, but retain the high-level rebalance.
>>>>>> With
>>>>>>>>>>> a finite stream, you would know the difference of the 2 async
>>>>>>>>>>> scenarios: fetch-in-progress versus end-of-stream.  With an
>>>> infinite
>>>>>>>>>>> stream, you never get end-of-stream.
>>>>>>>>>>> 
>>>>>>>>>>> Aside from a high-level consumer over a finite range within each
>>>>>>>>>>> partition, the other feature I can think of is more complicated.
>> A
>>>>>>>>>>> high-level consumer has state machine changes that the client
>>>> cannot
>>>>>>>>>>> access, to my knowledge.  Our use of kafka has us invoke a
>> message
>>>>>>>>>>> handler with each message we consumer from the KafkaStream, so we
>>>>>>>>>>> convert a pull-model to a push-model.  Including the idea of
>>>>>> receiving
>>>>>>>>>>> notifications from state machine changes, what would be really
>> nice
>>>>>> is
>>>>>>>>>>> to have a KafkaMessageSource, that is an eventful push model.  If
>>>> it
>>>>>>>>>>> were thread-safe, then we could register listeners for various
>>>>>> events:
>>>>>>>>>>> 
>>>>>>>>>>> *   opening-stream
>>>>>>>>>>> *   closing-stream
>>>>>>>>>>> *   message-arrived
>>>>>>>>>>> *   end-of-stream/no-more-messages-in-partition (for finite
>>>> streams)
>>>>>>>>>>> *   rebalance started
>>>>>>>>>>> *   partition assigned
>>>>>>>>>>> *   partition unassigned
>>>>>>>>>>> *   rebalance finished
>>>>>>>>>>> *   partition-offset-committed
>>>>>>>>>>> 
>>>>>>>>>>> Perhaps that is just our use, but instead of a pull-oriented
>>>>>>>>>>> KafkaStream, is there any sense in your providing a push-oriented
>>>>>>>>>>> KafkaMessageSource publishing OOB messages?
>>>>>>>>>>> 
>>>>>>>>>>> thank you,
>>>>>>>>>>> Robert
>>>>>>>>>>> 
>>>>>>>>>>> On Feb 21, 2014, at 5:59 PM, Jun Rao <jun...@gmail.com<mailto:
>>>>>>>>>>> jun...@gmail.com><mailto:
>>>>>>>>>>> jun...@gmail.com<mailto:jun...@gmail.com>><mailto:
>>>>>>>>>>> jun...@gmail.com<mailto:jun...@gmail.com><mailto:
>> jun...@gmail.com
>>>>>>>>> 
>>>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>> Robert,
>>>>>>>>>>> 
>>>>>>>>>>> Could you explain why you want to distinguish btw
>>>>>>>>>>> FetchingInProgressException and NoMessagePendingException? The
>>>>>>>>>>> nextMsgs() method that you want is exactly what poll() does.
>>>>>>>>>>> 
>>>>>>>>>>> Thanks,
>>>>>>>>>>> 
>>>>>>>>>>> Jun
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> On Wed, Feb 19, 2014 at 8:45 AM, Withers, Robert
>>>>>>>>>>> <robert.with...@dish.com<mailto:robert.with...@dish.com>
>> <mailto:
>>>>>>>>>>> robert.with...@dish.com>
>>>>>>>>>>> <mailto:robert.with...@dish.com>>wrote:
>>>>>>>>>>> 
>>>>>>>>>>> I am not clear on why the consumer stream should be positionable,
>>>>>>>>>>> especially if it is limited to the in-memory fetched messages.
>>>>>> Could
>>>>>>>>>>> someone explain to me, please?  I really like the idea of
>>>> committing
>>>>>>>>>>> the offset specifically on those partitions with changed read
>>>>>> offsets,
>>>>>>>>>>> only.
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 2 items I would like to see added to the KafkaStream are:
>>>>>>>>>>> 
>>>>>>>>>>> *         a non-blocking next(), throws several exceptions
>>>>>>>>>>> (FetchingInProgressException and a NoMessagePendingException or
>>>>>>>>>>> something) to differentiate between fetching or no messages left.
>>>>>>>>>>> 
>>>>>>>>>>> *         A nextMsgs() method which returns all locally available
>>>>>>>>>>> messages
>>>>>>>>>>> and kicks off a fetch for the next chunk.
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> If you are trying to add transactional features, then formally
>>>>>> define
>>>>>>>>>>> a DTP capability and pull in other server frameworks to share the
>>>>>>>>>>> implementation.  Should it be XA/Open?  How about a new peer2peer
>>>>>> DTP
>>>>>>>>>>> protocol?
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> Thank you,
>>>>>>>>>>> 
>>>>>>>>>>> Robert
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> Robert Withers
>>>>>>>>>>> 
>>>>>>>>>>> Staff Analyst/Developer
>>>>>>>>>>> 
>>>>>>>>>>> o: (720) 514-8963
>>>>>>>>>>> 
>>>>>>>>>>> c:  (571) 262-1873
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> -----Original Message-----
>>>>>>>>>>> From: Jay Kreps [mailto:jay.kr...@gmail.com]
>>>>>>>>>>> Sent: Sunday, February 16, 2014 10:13 AM
>>>>>>>>>>> To: users@kafka.apache.org<mailto:users@kafka.apache.org
>>> <mailto:
>>>>>>>>>>> users@kafka.apache.org><mailto:
>>>>>>>>>>> users@kafka.apache.org<mailto:users@kafka.apache.org>>
>>>>>>>>>>> Subject: Re: New Consumer API discussion
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> +1 I think those are good. It is a little weird that changing the
>>>>>>>>>>> +fetch
>>>>>>>>>>> 
>>>>>>>>>>> point is not batched but changing the commit point is, but I
>>>> suppose
>>>>>>>>>>> there is no helping that.
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> -Jay
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> On Sat, Feb 15, 2014 at 7:52 AM, Neha Narkhede
>>>>>>>>>>> <neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>
>> <mailto:
>>>>>>>>>>> neha.narkh...@gmail.com>
>>>>>>>>>>> <mailto:neha.narkh...@gmail.com>
>>>>>>>>>>> <mailto:neha.narkh...@gmail.com>>wrote:
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> Jay,
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> That makes sense. position/seek deal with changing the consumers
>>>>>>>>>>> 
>>>>>>>>>>> in-memory data, so there is no remote rpc there. For some
>> reason, I
>>>>>>>>>>> 
>>>>>>>>>>> got committed and seek mixed up in my head at that time :)
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> So we still end up with
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> long position(TopicPartition tp)
>>>>>>>>>>> 
>>>>>>>>>>> void seek(TopicPartitionOffset p)
>>>>>>>>>>> 
>>>>>>>>>>> Map<TopicPartition, Long> committed(TopicPartition tp);
>>>>>>>>>>> 
>>>>>>>>>>> void commit(TopicPartitionOffset...);
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> Thanks,
>>>>>>>>>>> 
>>>>>>>>>>> Neha
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> On Friday, February 14, 2014, Jay Kreps <jay.kr...@gmail.com
>>>>>> <mailto:
>>>>>>>>>>> jay.kr...@gmail.com><mailto:
>>>>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto:
>>>>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:
>>>>>>>>>>> jay.kr...@gmail.com>><mailto:
>>>>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:
>>>>>>>>>>> jay.kr...@gmail.com><mailto:jay.kreps@gmail
>>>>>>>>>>> .com>>>
>>>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> Oh, interesting. So I am assuming the following implementation:
>>>>>>>>>>> 
>>>>>>>>>>> 1. We have an in-memory fetch position which controls the next
>>>> fetch
>>>>>>>>>>> 
>>>>>>>>>>> offset.
>>>>>>>>>>> 
>>>>>>>>>>> 2. Changing this has no effect until you poll again at which
>> point
>>>>>>>>>>> 
>>>>>>>>>>> your fetch request will be from the newly specified offset 3. We
>>>>>>>>>>> 
>>>>>>>>>>> then have an in-memory but also remotely stored committed offset.
>>>>>>>>>>> 
>>>>>>>>>>> 4. Calling commit has the effect of saving the fetch position as
>>>>>>>>>>> 
>>>>>>>>>>> both the in memory committed position and in the remote store 5.
>>>>>>>>>>> 
>>>>>>>>>>> Auto-commit is the same as periodically calling commit on all
>>>>>>>>>>> 
>>>>>>>>>>> positions.
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> So batching on commit as well as getting the committed position
>>>>>>>>>>> 
>>>>>>>>>>> makes sense, but batching the fetch position wouldn't, right? I
>>>>>>>>>>> 
>>>>>>>>>>> think you are actually thinking of a different approach.
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> -Jay
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> On Thu, Feb 13, 2014 at 10:40 PM, Neha Narkhede
>>>>>>>>>>> 
>>>>>>>>>>> <neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto:
>>>>>>>>>>> neha.narkh...@gmail.com><mailto:
>>>>>>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>>
>>>>>>>>>>> 
>>>>>>>>>>> <javascript:;>
>>>>>>>>>>> 
>>>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> I think you are saying both, i.e. if you have committed on a
>>>>>>>>>>> 
>>>>>>>>>>> partition it returns you that value but if you
>>>>>>>>>>> 
>>>>>>>>>>> haven't
>>>>>>>>>>> 
>>>>>>>>>>> it does a remote lookup?
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> Correct.
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> The other argument for making committed batched is that commit()
>>>>>>>>>>> 
>>>>>>>>>>> is batched, so there is symmetry.
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> position() and seek() are always in memory changes (I assume) so
>>>>>>>>>>> 
>>>>>>>>>>> there
>>>>>>>>>>> 
>>>>>>>>>>> is
>>>>>>>>>>> 
>>>>>>>>>>> no need to batch them.
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> I'm not as sure as you are about that assumption being true.
>>>>>>>>>>> 
>>>>>>>>>>> Basically
>>>>>>>>>>> 
>>>>>>>>>>> in
>>>>>>>>>>> 
>>>>>>>>>>> my example above, the batching argument for committed() also
>>>>>>>>>>> 
>>>>>>>>>>> applies to
>>>>>>>>>>> 
>>>>>>>>>>> position() since one purpose of fetching a partition's offset is
>>>>>>>>>>> 
>>>>>>>>>>> to use
>>>>>>>>>>> 
>>>>>>>>>>> it
>>>>>>>>>>> 
>>>>>>>>>>> to set the position of the consumer to that offset. Since that
>>>>>>>>>>> 
>>>>>>>>>>> might
>>>>>>>>>>> 
>>>>>>>>>>> lead
>>>>>>>>>>> 
>>>>>>>>>>> to a remote OffsetRequest call, I think we probably would be
>>>>>>>>>>> 
>>>>>>>>>>> better off batching it.
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> Another option for naming would be position/reposition instead of
>>>>>>>>>>> 
>>>>>>>>>>> position/seek.
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> I think position/seek is better since it aligns with Java file
>>>> APIs.
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> I also think your suggestion about ConsumerPosition makes sense.
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> Thanks,
>>>>>>>>>>> 
>>>>>>>>>>> Neha
>>>>>>>>>>> 
>>>>>>>>>>> On Feb 13, 2014 9:22 PM, "Jay Kreps" <jay.kr...@gmail.com
>> <mailto:
>>>>>>>>>>> jay.kr...@gmail.com><mailto:
>>>>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto:
>>>>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:
>>>>>>>>>>> jay.kr...@gmail.com>><mailto:
>>>>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:
>>>>>>>>>>> jay.kr...@gmail.com><mailto:jay.kreps@gmail
>>>>>>>>>>> .com>>>
>>>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> Hey Neha,
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> I actually wasn't proposing the name TopicOffsetPosition, that
>>>>>>>>>>> 
>>>>>>>>>>> was
>>>>>>>>>>> 
>>>>>>>>>>> just a
>>>>>>>>>>> 
>>>>>>>>>>> typo. I meant TopicPartitionOffset, and I was just referencing
>>>>>>>>>>> 
>>>>>>>>>>> what
>>>>>>>>>>> 
>>>>>>>>>>> was
>>>>>>>>>>> 
>>>>>>>>>>> in
>>>>>>>>>>> 
>>>>>>>>>>> the javadoc. So to restate my proposal without the typo, using
>>>>>>>>>>> 
>>>>>>>>>>> just
>>>>>>>>>>> 
>>>>>>>>>>> the
>>>>>>>>>>> 
>>>>>>>>>>> existing classes (that naming is a separate question):
>>>>>>>>>>> 
>>>>>>>>>>> long position(TopicPartition tp)
>>>>>>>>>>> 
>>>>>>>>>>> void seek(TopicPartitionOffset p)
>>>>>>>>>>> 
>>>>>>>>>>> long committed(TopicPartition tp)
>>>>>>>>>>> 
>>>>>>>>>>> void commit(TopicPartitionOffset...);
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> So I may be unclear on committed() (AKA lastCommittedOffset). Is
>>>>>>>>>>> 
>>>>>>>>>>> it returning the in-memory value from the last commit by this
>>>>>>>>>>> 
>>>>>>>>>>> consumer,
>>>>>>>>>>> 
>>>>>>>>>>> or
>>>>>>>>>>> 
>>>>>>>>>>> is
>>>>>>>>>>> 
>>>>>>>>>>> it doing a remote fetch, or both? I think you are saying both,
>> i.e.
>>>>>>>>>>> 
>>>>>>>>>>> if
>>>>>>>>>>> 
>>>>>>>>>>> you
>>>>>>>>>>> 
>>>>>>>>>>> have committed on a partition it returns you that value but if
>>>>>>>>>>> 
>>>>>>>>>>> you
>>>>>>>>>>> 
>>>>>>>>>>> haven't
>>>>>>>>>>> 
>>>>>>>>>>> it does a remote lookup?
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> The other argument for making committed batched is that commit()
>>>>>>>>>>> 
>>>>>>>>>>> is batched, so there is symmetry.
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> position() and seek() are always in memory changes (I assume) so
>>>>>>>>>>> 
>>>>>>>>>>> there
>>>>>>>>>>> 
>>>>>>>>>>> is
>>>>>>>>>>> 
>>>>>>>>>>> no need to batch them.
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> So taking all that into account what if we revise it to
>>>>>>>>>>> 
>>>>>>>>>>> long position(TopicPartition tp)
>>>>>>>>>>> 
>>>>>>>>>>> void seek(TopicPartitionOffset p)
>>>>>>>>>>> 
>>>>>>>>>>> Map<TopicPartition, Long> committed(TopicPartition tp);
>>>>>>>>>>> 
>>>>>>>>>>> void commit(TopicPartitionOffset...);
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> This is not symmetric between position/seek and commit/committed
>>>>>>>>>>> 
>>>>>>>>>>> but
>>>>>>>>>>> 
>>>>>>>>>>> it
>>>>>>>>>>> 
>>>>>>>>>>> is
>>>>>>>>>>> 
>>>>>>>>>>> convenient. Another option for naming would be
>>>>>>>>>>> 
>>>>>>>>>>> position/reposition
>>>>>>>>>>> 
>>>>>>>>>>> instead
>>>>>>>>>>> 
>>>>>>>>>>> of position/seek.
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> With respect to the name TopicPartitionOffset, what I was trying
>>>>>>>>>>> 
>>>>>>>>>>> to
>>>>>>>>>>> 
>>>>>>>>>>> say
>>>>>>>>>>> 
>>>>>>>>>>> is
>>>>>>>>>>> 
>>>>>>>>>>> that I recommend we change that to something shorter. I think
>>>>>>>>>>> 
>>>>>>>>>>> TopicPosition
>>>>>>>>>>> 
>>>>>>>>>>> or ConsumerPosition might be better. Position does not refer to
>>>>>>>>>>> 
>>>>>>>>>>> the variables in the object, it refers to the meaning of the
>>>>>>>>>>> 
>>>>>>>>>>> object--it represents a position within a topic. The offset
>>>>>>>>>>> 
>>>>>>>>>>> field in that object
>>>>>>>>>>> 
>>>>>>>>>>> is
>>>>>>>>>>> 
>>>>>>>>>>> still called the offset. TopicOffset, PartitionOffset, or
>>>>>>>>>>> 
>>>>>>>>>>> ConsumerOffset
>>>>>>>>>>> 
>>>>>>>>>>> would all be workable too. Basically I am just objecting to
>>>>>>>>>>> 
>>>>>>>>>>> concatenating
>>>>>>>>>>> 
>>>>>>>>>>> three nouns together. :-)
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> -Jay
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> On Thu, Feb 13, 2014 at 1:54 PM, Neha Narkhede <
>>>>>>>>>>> 
>>>>>>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto:
>>>>>>>>>>> neha.narkh...@gmail.com><mailto:
>>>>>>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>><mailto:
>>>>>>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto:
>>>>>>>>>>> neha.narkh...@gmail.com>>
>>>>>>>>>>> 
>>>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 2. It returns a list of results. But how can you use the list?
>>>>>>>>>>> 
>>>>>>>>>>> The
>>>>>>>>>>> 
>>>>>>>>>>> only
>>>>>>>>>>> 
>>>>>>>>>>> way
>>>>>>>>>>> 
>>>>>>>>>>> to use the list is to make a map of tp=>offset and then look
>>>>>>>>>>> 
>>>>>>>>>>> up
>>>>>>>>>>> 
>>>>>>>>>>> results
>>>>>>>>>>> 
>>>>>>>>>>> in
>>>>>>>>>>> 
>>>>>>>>>>> this map (or do a for loop over the list for the partition you
>>>>>>>>>>> 
>>>>>>>>>>> want). I
>>>>>>>>>>> 
>>>>>>>>>>> recommend that if this is an in-memory check we just do one at
>>>>>>>>>>> 
>>>>>>>>>>> a
>>>>>>>>>>> 
>>>>>>>>>>> time.
>>>>>>>>>>> 
>>>>>>>>>>> E.g.
>>>>>>>>>>> 
>>>>>>>>>>> long committedPosition(
>>>>>>>>>>> 
>>>>>>>>>>> TopicPosition).
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> This was discussed in the previous emails. There is a choic
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> --
>>>>>>>>>>> Robert Withers
>>>>>>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com><mailto:
>>>>>>>>>>> robert.with...@dish.com><mailto:
>>>>>>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com>>
>>>>>>>>>>> c: 303.919.5856
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> --
>>>>>>>>>>> Robert Withers
>>>>>>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com><mailto:
>>>>>>>>>>> robert.with...@dish.com>
>>>>>>>>>>> c: 303.919.5856
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> --
>>>>>>>>>>> -- Guozhang
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> --
>>>>>>>>>>> Robert Withers
>>>>>>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com>
>>>>>>>>>>> c: 303.919.5856
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> 
>>>> 
>> 
>> 
>> 


Attachment: smime.p7s
Description: S/MIME cryptographic signature

Reply via email to