Few clarifications: 1. "The new consumer API is non blocking and instead of returning a blocking iterator, the consumer provides a poll() API that returns a list of records. "
So this means the consumer polls, and if there are new messages it pulls them down and then disconnects? 2. " The consumer also allows long poll to reduce the end-to-end message latency for low throughput data." How is this different than blocking? Is it even based meaning it keeps a long poll conneciton open, and if/when a new message arrives it triggers an event on the consumer side? 3. " The consumer batches data and multiplexes I/O over TCP connections to each of the brokers it communicates with, for high throughput. " If it is single threaded, does each tcp brocker connection block? Not sure I understand how this works if it is single threaded. On Thu, Feb 27, 2014 at 11:38 PM, Robert Withers <robert.w.with...@gmail.com > wrote: > Thank you, Neha, that makes it clear. Really, the aspect of all this that > we could really use is a way to do exactly once processing. We are looking > at more critical data. What are the latest thoughts on how to achieve > exactly once and how might that affect a consumer API? > > Thanks, > Rob > > On Feb 27, 2014, at 10:29 AM, Neha Narkhede <neha.narkh...@gmail.com> > wrote: > > > Is this< > http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html#seek%28kafka.common.TopicPartitionOffset...%29 > >what > > you are looking for? Basically, I think from the overall feedback, it > > looks like code snippets don't seem to work for overall understanding of > > the APIs. I plan to update the javadoc with more complete examples that > > have been discussed so far on this thread and generally on the mailing > list. > > > > Thanks, > > Neha > > > > > > > > > > On Thu, Feb 27, 2014 at 4:17 AM, Robert Withers > > <robert.w.with...@gmail.com>wrote: > > > >> Neha, > >> > >> I see how one might wish to implement onPartitionsAssigned and > >> onPartitionsRevoked, but I don't have a sense for how I might supply > these > >> implementations to a running consumer. What would the setup code look > like > >> to start a high-level consumer with these provided implementations? > >> > >> thanks, > >> Rob > >> > >> > >> On Feb 27, 2014, at 3:48 AM, Neha Narkhede <neha.narkh...@gmail.com> > >> wrote: > >> > >>> Rob, > >>> > >>> The use of the callbacks is explained in the javadoc here - > >>> > >> > http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerRebalanceCallback.html > >>> > >>> Let me know if it makes sense. The hope is to improve the javadoc so > that > >>> it is self explanatory. > >>> > >>> Thanks, > >>> Neha > >>> > >>> > >>> On Wed, Feb 26, 2014 at 9:16 AM, Robert Withers > >>> <robert.w.with...@gmail.com>wrote: > >>> > >>>> Neha, what does the use of the RebalanceBeginCallback and > >>>> RebalanceEndCallback look like? > >>>> > >>>> thanks, > >>>> Rob > >>>> > >>>> On Feb 25, 2014, at 3:51 PM, Neha Narkhede <neha.narkh...@gmail.com> > >>>> wrote: > >>>> > >>>>> How do you know n? The whole point is that you need to be able to > fetch > >>>> the > >>>>> end offset. You can't a priori decide you will load 1m messages > without > >>>>> knowing what is there. > >>>>> > >>>>> Hmm. I think what you are pointing out is that in the new consumer > API, > >>>> we > >>>>> don't have a way to issue the equivalent of the existing > >>>> getOffsetsBefore() > >>>>> API. Agree that is a flaw that we should fix. > >>>>> > >>>>> Will update the docs/wiki with a few use cases that I've collected so > >> far > >>>>> and see if the API covers those. > >>>>> > >>>>> I would prefer PartitionsAssigned and PartitionsRevoked as that seems > >>>>> clearer to me > >>>>> > >>>>> Well the RebalanceBeginCallback interface will have > >>>> onPartitionsAssigned() > >>>>> as the callback. Similarly, the RebalanceEndCallback interface will > >> have > >>>>> onPartitionsRevoked() as the callback. Makes sense? > >>>>> > >>>>> Thanks, > >>>>> Neha > >>>>> > >>>>> > >>>>> On Tue, Feb 25, 2014 at 2:38 PM, Jay Kreps <jay.kr...@gmail.com> > >> wrote: > >>>>> > >>>>>> 1. I would prefer PartitionsAssigned and PartitionsRevoked as that > >> seems > >>>>>> clearer to me. > >>>>>> > >>>>>> -Jay > >>>>>> > >>>>>> > >>>>>> On Tue, Feb 25, 2014 at 10:19 AM, Neha Narkhede < > >>>> neha.narkh...@gmail.com > >>>>>>> wrote: > >>>>>> > >>>>>>> Thanks for the reviews so far! There are a few outstanding > questions > >> - > >>>>>>> > >>>>>>> 1. It will be good to make the rebalance callbacks forward > >> compatible > >>>>>> with > >>>>>>> Java 8 capabilities. We can change it to PartitionsAssignedCallback > >>>>>>> and PartitionsRevokedCallback or RebalanceBeginCallback and > >>>>>>> RebalanceEndCallback? > >>>>>>> > >>>>>>> If there are no objections, I will change it to > >> RebalanceBeginCallback > >>>>>> and > >>>>>>> RebalanceEndCallback. > >>>>>>> > >>>>>>> 2. The return type for committed() is List<TopicPartitionOffset>. > >>>> There > >>>>>>> was a suggestion to change it to either be Map<TopicPartition,Long> > >> or > >>>>>>> Map<TopicPartition, TopicPartitionOffset> > >>>>>>> > >>>>>>> Do people have feedback on this suggestion? > >>>>>>> > >>>>>>> > >>>>>>> On Tue, Feb 25, 2014 at 9:56 AM, Neha Narkhede < > >>>> neha.narkh...@gmail.com > >>>>>>>> wrote: > >>>>>>> > >>>>>>>> Robert, > >>>>>>>> > >>>>>>>> Are you saying it is possible to get events from the high-level > >>>>>>> consumerregarding various state machine changes? For instance, can > >> we > >>>>>> get a > >>>>>>>> notification when a rebalance starts and ends, when a partition is > >>>>>>>> assigned/unassigned, when an offset is committed on a partition, > >> when > >>>> a > >>>>>>>> leader changes and so on? I call this OOB traffic, since they are > >> not > >>>>>>> the > >>>>>>>> core messages streaming, but side-band events, yet they are still > >>>>>>>> potentially useful to consumers. > >>>>>>>> > >>>>>>>> In the current proposal, you get notified when the state machine > >>>>>> changes > >>>>>>>> i.e. before and after a rebalance is triggered. Look at > >>>>>>>> ConsumerRebalanceCallback< > >>>>>>> > >>>>>> > >>>> > >> > http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerRebalanceCallback.html > >>>>>>>> > >>>>>>>> .Leader changes do not count as state machine changes for consumer > >>>>>>>> rebalance purposes. > >>>>>>>> > >>>>>>>> Thanks, > >>>>>>>> Neha > >>>>>>>> > >>>>>>>> > >>>>>>>> On Tue, Feb 25, 2014 at 9:54 AM, Neha Narkhede < > >>>>>> neha.narkh...@gmail.com > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Jay/Robert - > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> I think what Robert is saying is that we need to think through > the > >>>>>>> offset > >>>>>>>>> API to enable "batch processing" of topic data. Think of a > process > >>>>>> that > >>>>>>>>> periodically kicks off to compute a data summary or do a data > load > >> or > >>>>>>>>> something like that. I think what we need to support this is an > api > >>>> to > >>>>>>>>> fetch the last offset from the server for a partition. Something > >> like > >>>>>>>>> long lastOffset(TopicPartition tp) > >>>>>>>>> and for symmetry > >>>>>>>>> long firstOffset(TopicPartition tp) > >>>>>>>>> > >>>>>>>>> Likely this would have to be batched. > >>>>>>>>> > >>>>>>>>> A fixed range of data load can be done using the existing APIs as > >>>>>>>>> follows. This assumes you know the endOffset which can be > >>>>>> currentOffset > >>>>>>> + n > >>>>>>>>> (number of messages in the load) > >>>>>>>>> > >>>>>>>>> long startOffset = consumer.position(partition); > >>>>>>>>> long endOffset = startOffset + n; > >>>>>>>>> while(consumer.position(partition) <= endOffset) { > >>>>>>>>> List<ConsumerRecord> messages = consumer.poll(timeout, > >>>>>>>>> TimeUnit.MILLISECONDS); > >>>>>>>>> process(messages, endOffset); // processes messages > >>>>>> until > >>>>>>>>> endOffset > >>>>>>>>> } > >>>>>>>>> > >>>>>>>>> Does that make sense? > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Tue, Feb 25, 2014 at 9:49 AM, Neha Narkhede < > >>>>>> neha.narkh...@gmail.com > >>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Thanks for the review, Jun. Here are some comments - > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> 1. The using of ellipsis: This may make passing a list of items > >> from > >>>>>> a > >>>>>>>>>> collection to the api a bit harder. Suppose that you have a list > >> of > >>>>>>>>>> topics > >>>>>>>>>> stored in > >>>>>>>>>> > >>>>>>>>>> ArrayList<String> topics; > >>>>>>>>>> > >>>>>>>>>> If you want subscribe to all topics in one call, you will have > to > >>>> do: > >>>>>>>>>> > >>>>>>>>>> String[] topicArray = new String[topics.size()]; > >>>>>>>>>> consumer.subscribe(topics. > >>>>>>>>>> toArray(topicArray)); > >>>>>>>>>> > >>>>>>>>>> A similar argument can be made for arguably the more common use > >> case > >>>>>> of > >>>>>>>>>> subscribing to a single topic as well. In these cases, user is > >>>>>> required > >>>>>>>>>> to write more > >>>>>>>>>> code to create a single item collection and pass it in. Since > >>>>>>>>>> subscription is extremely lightweight > >>>>>>>>>> invoking it multiple times also seems like a workable solution, > >> no? > >>>>>>>>>> > >>>>>>>>>> 2. It would be good to document that the following apis are > >> mutually > >>>>>>>>>> exclusive. Also, if the partition level subscription is > specified, > >>>>>>> there > >>>>>>>>>> is > >>>>>>>>>> no group management. Finally, unsubscribe() can only be used to > >>>>>> cancel > >>>>>>>>>> subscriptions with the same pattern. For example, you can't > >>>>>> unsubscribe > >>>>>>>>>> at > >>>>>>>>>> the partition level if the subscription is done at the topic > >> level. > >>>>>>>>>> > >>>>>>>>>> *subscribe*(java.lang.String... topics) > >>>>>>>>>> *subscribe*(java.lang.String topic, int... partitions) > >>>>>>>>>> > >>>>>>>>>> Makes sense. Made the suggested improvements to the docs< > >>>>>>> > >>>>>> > >>>> > >> > http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/Consumer.html#subscribe%28java.lang.String...%29 > >>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> 3.commit(): The following comment in the doc should probably say > >>>>>>> "commit > >>>>>>>>>> offsets for partitions assigned to this consumer". > >>>>>>>>>> > >>>>>>>>>> If no partitions are specified, commits offsets for the > subscribed > >>>>>>> list > >>>>>>>>>> of > >>>>>>>>>> topics and partitions to Kafka. > >>>>>>>>>> > >>>>>>>>>> Could you give more context on this suggestion? Here is the > entire > >>>>>> doc > >>>>>>> - > >>>>>>>>>> > >>>>>>>>>> Synchronously commits the specified offsets for the specified > list > >>>> of > >>>>>>>>>> topics and partitions to *Kafka*. If no partitions are > specified, > >>>>>>>>>> commits offsets for the subscribed list of topics and > partitions. > >>>>>>>>>> > >>>>>>>>>> The hope is to convey that if no partitions are specified, > offsets > >>>>>> will > >>>>>>>>>> be committed for the subscribed list of partitions. One > >> improvement > >>>>>>> could > >>>>>>>>>> be to > >>>>>>>>>> explicitly state that the offsets returned on the last poll will > >> be > >>>>>>>>>> committed. I updated this to - > >>>>>>>>>> > >>>>>>>>>> Synchronously commits the specified offsets for the specified > list > >>>> of > >>>>>>>>>> topics and partitions to *Kafka*. If no offsets are specified, > >>>>>> commits > >>>>>>>>>> offsets returned on the last {@link #poll(long, TimeUnit) > poll()} > >>>> for > >>>>>>>>>> the subscribed list of topics and partitions. > >>>>>>>>>> > >>>>>>>>>> 4. There is inconsistency in specifying partitions. Sometimes we > >> use > >>>>>>>>>> TopicPartition and some other times we use String and int (see > >>>>>>>>>> examples below). > >>>>>>>>>> > >>>>>>>>>> void onPartitionsAssigned(Consumer consumer, > >>>>>>>>>> TopicPartition...partitions) > >>>>>>>>>> > >>>>>>>>>> public void *subscribe*(java.lang.String topic, int... > partitions) > >>>>>>>>>> > >>>>>>>>>> Yes, this was discussed previously. I think generally the > >> consensus > >>>>>>>>>> seems to be to use the higher level > >>>>>>>>>> classes everywhere. Made those changes. > >>>>>>>>>> > >>>>>>>>>> What's the use case of position()? Isn't that just the > >> nextOffset() > >>>>>> on > >>>>>>>>>> the > >>>>>>>>>> last message returned from poll()? > >>>>>>>>>> > >>>>>>>>>> Yes, except in the case where a rebalance is triggered and > poll() > >> is > >>>>>>> not > >>>>>>>>>> yet invoked. Here, you would use position() to get the new fetch > >>>>>>> position > >>>>>>>>>> for the specific partition. Even if this is not a common use > case, > >>>>>> IMO > >>>>>>> it > >>>>>>>>>> is much easier to use position() to get the fetch offset than > >>>>>> invoking > >>>>>>>>>> nextOffset() on the last message. This also keeps the APIs > >>>> symmetric, > >>>>>>> which > >>>>>>>>>> is nice. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On Mon, Feb 24, 2014 at 7:06 PM, Withers, Robert < > >>>>>>>>>> robert.with...@dish.com> wrote: > >>>>>>>>>> > >>>>>>>>>>> That's wonderful. Thanks for kafka. > >>>>>>>>>>> > >>>>>>>>>>> Rob > >>>>>>>>>>> > >>>>>>>>>>> On Feb 24, 2014, at 9:58 AM, Guozhang Wang <wangg...@gmail.com > >>>>>>> <mailto: > >>>>>>>>>>> wangg...@gmail.com>> wrote: > >>>>>>>>>>> > >>>>>>>>>>> Hi Robert, > >>>>>>>>>>> > >>>>>>>>>>> Yes, you can check out the callback functions in the new API > >>>>>>>>>>> > >>>>>>>>>>> onPartitionDesigned > >>>>>>>>>>> onPartitionAssigned > >>>>>>>>>>> > >>>>>>>>>>> and see if they meet your needs. > >>>>>>>>>>> > >>>>>>>>>>> Guozhang > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert < > >>>>>>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com>>wrote: > >>>>>>>>>>> > >>>>>>>>>>> Jun, > >>>>>>>>>>> > >>>>>>>>>>> Are you saying it is possible to get events from the high-level > >>>>>>> consumer > >>>>>>>>>>> regarding various state machine changes? For instance, can we > >> get > >>>> a > >>>>>>>>>>> notification when a rebalance starts and ends, when a partition > >> is > >>>>>>>>>>> assigned/unassigned, when an offset is committed on a > partition, > >>>>>> when > >>>>>>> a > >>>>>>>>>>> leader changes and so on? I call this OOB traffic, since they > >> are > >>>>>> not > >>>>>>>>>>> the > >>>>>>>>>>> core messages streaming, but side-band events, yet they are > still > >>>>>>>>>>> potentially useful to consumers. > >>>>>>>>>>> > >>>>>>>>>>> Thank you, > >>>>>>>>>>> Robert > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Robert Withers > >>>>>>>>>>> Staff Analyst/Developer > >>>>>>>>>>> o: (720) 514-8963 > >>>>>>>>>>> c: (571) 262-1873 > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> -----Original Message----- > >>>>>>>>>>> From: Jun Rao [mailto:jun...@gmail.com] > >>>>>>>>>>> Sent: Sunday, February 23, 2014 4:19 PM > >>>>>>>>>>> To: users@kafka.apache.org<mailto:users@kafka.apache.org> > >>>>>>>>>>> Subject: Re: New Consumer API discussion > >>>>>>>>>>> > >>>>>>>>>>> Robert, > >>>>>>>>>>> > >>>>>>>>>>> For the push orient api, you can potentially implement your own > >>>>>>>>>>> MessageHandler with those methods. In the main loop of our new > >>>>>>> consumer > >>>>>>>>>>> api, you can just call those methods based on the events you > get. > >>>>>>>>>>> > >>>>>>>>>>> Also, we already have an api to get the first and the last > offset > >>>>>> of a > >>>>>>>>>>> partition (getOffsetBefore). > >>>>>>>>>>> > >>>>>>>>>>> Thanks, > >>>>>>>>>>> > >>>>>>>>>>> Jun > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert > >>>>>>>>>>> <robert.with...@dish.com<mailto:robert.with...@dish.com > >>wrote: > >>>>>>>>>>> > >>>>>>>>>>> This is a good idea, too. I would modify it to include stream > >>>>>>>>>>> marking, then you can have: > >>>>>>>>>>> > >>>>>>>>>>> long end = consumer.lastOffset(tp); > >>>>>>>>>>> consumer.setMark(end); > >>>>>>>>>>> while(consumer.beforeMark()) { > >>>>>>>>>>> process(consumer.pollToMark()); > >>>>>>>>>>> } > >>>>>>>>>>> > >>>>>>>>>>> or > >>>>>>>>>>> > >>>>>>>>>>> long end = consumer.lastOffset(tp); > >>>>>>>>>>> consumer.setMark(end); > >>>>>>>>>>> for(Object msg : consumer.iteratorToMark()) { > >>>>>>>>>>> process(msg); > >>>>>>>>>>> } > >>>>>>>>>>> > >>>>>>>>>>> I actually have 4 suggestions, then: > >>>>>>>>>>> > >>>>>>>>>>> * pull: stream marking > >>>>>>>>>>> * pull: finite streams, bound by time range (up-to-now, > >>>> yesterday) > >>>>>>> or > >>>>>>>>>>> offset > >>>>>>>>>>> * pull: async api > >>>>>>>>>>> * push: KafkaMessageSource, for a push model, with msg and > OOB > >>>>>>> events. > >>>>>>>>>>> Build one in either individual or chunk mode and have a > listener > >>>> for > >>>>>>>>>>> each msg or a listener for a chunk of msgs. Make it composable > >> and > >>>>>>>>>>> policy driven (chunked, range, commitOffsets policy, retry > >> policy, > >>>>>>>>>>> transactional) > >>>>>>>>>>> > >>>>>>>>>>> Thank you, > >>>>>>>>>>> Robert > >>>>>>>>>>> > >>>>>>>>>>> On Feb 22, 2014, at 11:21 AM, Jay Kreps <jay.kr...@gmail.com > >>>>>> <mailto: > >>>>>>>>>>> jay.kr...@gmail.com><mailto: > >>>>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>> I think what Robert is saying is that we need to think through > >> the > >>>>>>>>>>> offset API to enable "batch processing" of topic data. Think > of a > >>>>>>>>>>> process that periodically kicks off to compute a data summary > or > >> do > >>>>>> a > >>>>>>>>>>> data load or something like that. I think what we need to > support > >>>>>> this > >>>>>>>>>>> is an api to fetch the last offset from the server for a > >> partition. > >>>>>>>>>>> Something like > >>>>>>>>>>> long lastOffset(TopicPartition tp) > >>>>>>>>>>> and for symmetry > >>>>>>>>>>> long firstOffset(TopicPartition tp) > >>>>>>>>>>> > >>>>>>>>>>> Likely this would have to be batched. Essentially we should add > >>>> this > >>>>>>>>>>> use case to our set of code examples to write and think > through. > >>>>>>>>>>> > >>>>>>>>>>> The usage would be something like > >>>>>>>>>>> > >>>>>>>>>>> long end = consumer.lastOffset(tp); > >>>>>>>>>>> while(consumer.position < end) > >>>>>>>>>>> process(consumer.poll()); > >>>>>>>>>>> > >>>>>>>>>>> -Jay > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert > >>>>>>>>>>> <robert.with...@dish.com<mailto:robert.with...@dish.com> > >>>>>>>>>>> <mailto:robert.with...@dish.com>>wrote: > >>>>>>>>>>> > >>>>>>>>>>> Jun, > >>>>>>>>>>> > >>>>>>>>>>> I was originally thinking a non-blocking read from a > distributed > >>>>>>>>>>> stream should distinguish between "no local messages, but a > fetch > >>>> is > >>>>>>>>>>> occurring" > >>>>>>>>>>> versus "you have drained the stream". The reason this may be > >>>>>> valuable > >>>>>>>>>>> to me is so I can write consumers that read all known traffic > >> then > >>>>>>>>>>> terminate. > >>>>>>>>>>> You caused me to reconsider and I think I am conflating 2 > things. > >>>>>> One > >>>>>>>>>>> is a sync/async api while the other is whether to have an > >> infinite > >>>>>> or > >>>>>>>>>>> finite stream. Is it possible to build a finite KafkaStream > on a > >>>>>>>>>>> range of messages? > >>>>>>>>>>> > >>>>>>>>>>> Perhaps a Simple Consumer would do just fine and then I could > >> start > >>>>>>>>>>> off getting the writeOffset from zookeeper and tell it to read > a > >>>>>>>>>>> specified range per partition. I've done this and forked a > >> simple > >>>>>>>>>>> consumer runnable for each partition, for one of our analyzers. > >>>> The > >>>>>>>>>>> great thing about the high-level consumer is that rebalance, > so I > >>>>>> can > >>>>>>>>>>> fork however many stream readers I want and you just figure it > >> out > >>>>>> for > >>>>>>>>>>> me. In that way you offer us the control over the resource > >>>>>>>>>>> consumption within a pull model. This is best to regulate > >> message > >>>>>>>>>>> pressure, they say. > >>>>>>>>>>> > >>>>>>>>>>> Combining that high-level rebalance ability with a ranged > >> partition > >>>>>>>>>>> drain could be really nice...build the stream with an ending > >>>>>> position > >>>>>>>>>>> and it is a finite stream, but retain the high-level rebalance. > >>>>>> With > >>>>>>>>>>> a finite stream, you would know the difference of the 2 async > >>>>>>>>>>> scenarios: fetch-in-progress versus end-of-stream. With an > >>>> infinite > >>>>>>>>>>> stream, you never get end-of-stream. > >>>>>>>>>>> > >>>>>>>>>>> Aside from a high-level consumer over a finite range within > each > >>>>>>>>>>> partition, the other feature I can think of is more > complicated. > >> A > >>>>>>>>>>> high-level consumer has state machine changes that the client > >>>> cannot > >>>>>>>>>>> access, to my knowledge. Our use of kafka has us invoke a > >> message > >>>>>>>>>>> handler with each message we consumer from the KafkaStream, so > we > >>>>>>>>>>> convert a pull-model to a push-model. Including the idea of > >>>>>> receiving > >>>>>>>>>>> notifications from state machine changes, what would be really > >> nice > >>>>>> is > >>>>>>>>>>> to have a KafkaMessageSource, that is an eventful push model. > If > >>>> it > >>>>>>>>>>> were thread-safe, then we could register listeners for various > >>>>>> events: > >>>>>>>>>>> > >>>>>>>>>>> * opening-stream > >>>>>>>>>>> * closing-stream > >>>>>>>>>>> * message-arrived > >>>>>>>>>>> * end-of-stream/no-more-messages-in-partition (for finite > >>>> streams) > >>>>>>>>>>> * rebalance started > >>>>>>>>>>> * partition assigned > >>>>>>>>>>> * partition unassigned > >>>>>>>>>>> * rebalance finished > >>>>>>>>>>> * partition-offset-committed > >>>>>>>>>>> > >>>>>>>>>>> Perhaps that is just our use, but instead of a pull-oriented > >>>>>>>>>>> KafkaStream, is there any sense in your providing a > push-oriented > >>>>>>>>>>> KafkaMessageSource publishing OOB messages? > >>>>>>>>>>> > >>>>>>>>>>> thank you, > >>>>>>>>>>> Robert > >>>>>>>>>>> > >>>>>>>>>>> On Feb 21, 2014, at 5:59 PM, Jun Rao <jun...@gmail.com<mailto: > >>>>>>>>>>> jun...@gmail.com><mailto: > >>>>>>>>>>> jun...@gmail.com<mailto:jun...@gmail.com>><mailto: > >>>>>>>>>>> jun...@gmail.com<mailto:jun...@gmail.com><mailto: > >> jun...@gmail.com > >>>>>>>>> > >>>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>> Robert, > >>>>>>>>>>> > >>>>>>>>>>> Could you explain why you want to distinguish btw > >>>>>>>>>>> FetchingInProgressException and NoMessagePendingException? The > >>>>>>>>>>> nextMsgs() method that you want is exactly what poll() does. > >>>>>>>>>>> > >>>>>>>>>>> Thanks, > >>>>>>>>>>> > >>>>>>>>>>> Jun > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On Wed, Feb 19, 2014 at 8:45 AM, Withers, Robert > >>>>>>>>>>> <robert.with...@dish.com<mailto:robert.with...@dish.com> > >> <mailto: > >>>>>>>>>>> robert.with...@dish.com> > >>>>>>>>>>> <mailto:robert.with...@dish.com>>wrote: > >>>>>>>>>>> > >>>>>>>>>>> I am not clear on why the consumer stream should be > positionable, > >>>>>>>>>>> especially if it is limited to the in-memory fetched messages. > >>>>>> Could > >>>>>>>>>>> someone explain to me, please? I really like the idea of > >>>> committing > >>>>>>>>>>> the offset specifically on those partitions with changed read > >>>>>> offsets, > >>>>>>>>>>> only. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> 2 items I would like to see added to the KafkaStream are: > >>>>>>>>>>> > >>>>>>>>>>> * a non-blocking next(), throws several exceptions > >>>>>>>>>>> (FetchingInProgressException and a NoMessagePendingException or > >>>>>>>>>>> something) to differentiate between fetching or no messages > left. > >>>>>>>>>>> > >>>>>>>>>>> * A nextMsgs() method which returns all locally > available > >>>>>>>>>>> messages > >>>>>>>>>>> and kicks off a fetch for the next chunk. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> If you are trying to add transactional features, then formally > >>>>>> define > >>>>>>>>>>> a DTP capability and pull in other server frameworks to share > the > >>>>>>>>>>> implementation. Should it be XA/Open? How about a new > peer2peer > >>>>>> DTP > >>>>>>>>>>> protocol? > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Thank you, > >>>>>>>>>>> > >>>>>>>>>>> Robert > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Robert Withers > >>>>>>>>>>> > >>>>>>>>>>> Staff Analyst/Developer > >>>>>>>>>>> > >>>>>>>>>>> o: (720) 514-8963 > >>>>>>>>>>> > >>>>>>>>>>> c: (571) 262-1873 > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> -----Original Message----- > >>>>>>>>>>> From: Jay Kreps [mailto:jay.kr...@gmail.com] > >>>>>>>>>>> Sent: Sunday, February 16, 2014 10:13 AM > >>>>>>>>>>> To: users@kafka.apache.org<mailto:users@kafka.apache.org > >>> <mailto: > >>>>>>>>>>> users@kafka.apache.org><mailto: > >>>>>>>>>>> users@kafka.apache.org<mailto:users@kafka.apache.org>> > >>>>>>>>>>> Subject: Re: New Consumer API discussion > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> +1 I think those are good. It is a little weird that changing > the > >>>>>>>>>>> +fetch > >>>>>>>>>>> > >>>>>>>>>>> point is not batched but changing the commit point is, but I > >>>> suppose > >>>>>>>>>>> there is no helping that. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> -Jay > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On Sat, Feb 15, 2014 at 7:52 AM, Neha Narkhede > >>>>>>>>>>> <neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com> > >> <mailto: > >>>>>>>>>>> neha.narkh...@gmail.com> > >>>>>>>>>>> <mailto:neha.narkh...@gmail.com> > >>>>>>>>>>> <mailto:neha.narkh...@gmail.com>>wrote: > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Jay, > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> That makes sense. position/seek deal with changing the > consumers > >>>>>>>>>>> > >>>>>>>>>>> in-memory data, so there is no remote rpc there. For some > >> reason, I > >>>>>>>>>>> > >>>>>>>>>>> got committed and seek mixed up in my head at that time :) > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> So we still end up with > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> long position(TopicPartition tp) > >>>>>>>>>>> > >>>>>>>>>>> void seek(TopicPartitionOffset p) > >>>>>>>>>>> > >>>>>>>>>>> Map<TopicPartition, Long> committed(TopicPartition tp); > >>>>>>>>>>> > >>>>>>>>>>> void commit(TopicPartitionOffset...); > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Thanks, > >>>>>>>>>>> > >>>>>>>>>>> Neha > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On Friday, February 14, 2014, Jay Kreps <jay.kr...@gmail.com > >>>>>> <mailto: > >>>>>>>>>>> jay.kr...@gmail.com><mailto: > >>>>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto: > >>>>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto: > >>>>>>>>>>> jay.kr...@gmail.com>><mailto: > >>>>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto: > >>>>>>>>>>> jay.kr...@gmail.com><mailto:jay.kreps@gmail > >>>>>>>>>>> .com>>> > >>>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Oh, interesting. So I am assuming the following implementation: > >>>>>>>>>>> > >>>>>>>>>>> 1. We have an in-memory fetch position which controls the next > >>>> fetch > >>>>>>>>>>> > >>>>>>>>>>> offset. > >>>>>>>>>>> > >>>>>>>>>>> 2. Changing this has no effect until you poll again at which > >> point > >>>>>>>>>>> > >>>>>>>>>>> your fetch request will be from the newly specified offset 3. > We > >>>>>>>>>>> > >>>>>>>>>>> then have an in-memory but also remotely stored committed > offset. > >>>>>>>>>>> > >>>>>>>>>>> 4. Calling commit has the effect of saving the fetch position > as > >>>>>>>>>>> > >>>>>>>>>>> both the in memory committed position and in the remote store > 5. > >>>>>>>>>>> > >>>>>>>>>>> Auto-commit is the same as periodically calling commit on all > >>>>>>>>>>> > >>>>>>>>>>> positions. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> So batching on commit as well as getting the committed position > >>>>>>>>>>> > >>>>>>>>>>> makes sense, but batching the fetch position wouldn't, right? I > >>>>>>>>>>> > >>>>>>>>>>> think you are actually thinking of a different approach. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> -Jay > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On Thu, Feb 13, 2014 at 10:40 PM, Neha Narkhede > >>>>>>>>>>> > >>>>>>>>>>> <neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com > ><mailto: > >>>>>>>>>>> neha.narkh...@gmail.com><mailto: > >>>>>>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>> > >>>>>>>>>>> > >>>>>>>>>>> <javascript:;> > >>>>>>>>>>> > >>>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> I think you are saying both, i.e. if you have committed on a > >>>>>>>>>>> > >>>>>>>>>>> partition it returns you that value but if you > >>>>>>>>>>> > >>>>>>>>>>> haven't > >>>>>>>>>>> > >>>>>>>>>>> it does a remote lookup? > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Correct. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> The other argument for making committed batched is that > commit() > >>>>>>>>>>> > >>>>>>>>>>> is batched, so there is symmetry. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> position() and seek() are always in memory changes (I assume) > so > >>>>>>>>>>> > >>>>>>>>>>> there > >>>>>>>>>>> > >>>>>>>>>>> is > >>>>>>>>>>> > >>>>>>>>>>> no need to batch them. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> I'm not as sure as you are about that assumption being true. > >>>>>>>>>>> > >>>>>>>>>>> Basically > >>>>>>>>>>> > >>>>>>>>>>> in > >>>>>>>>>>> > >>>>>>>>>>> my example above, the batching argument for committed() also > >>>>>>>>>>> > >>>>>>>>>>> applies to > >>>>>>>>>>> > >>>>>>>>>>> position() since one purpose of fetching a partition's offset > is > >>>>>>>>>>> > >>>>>>>>>>> to use > >>>>>>>>>>> > >>>>>>>>>>> it > >>>>>>>>>>> > >>>>>>>>>>> to set the position of the consumer to that offset. Since that > >>>>>>>>>>> > >>>>>>>>>>> might > >>>>>>>>>>> > >>>>>>>>>>> lead > >>>>>>>>>>> > >>>>>>>>>>> to a remote OffsetRequest call, I think we probably would be > >>>>>>>>>>> > >>>>>>>>>>> better off batching it. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Another option for naming would be position/reposition instead > of > >>>>>>>>>>> > >>>>>>>>>>> position/seek. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> I think position/seek is better since it aligns with Java file > >>>> APIs. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> I also think your suggestion about ConsumerPosition makes > sense. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Thanks, > >>>>>>>>>>> > >>>>>>>>>>> Neha > >>>>>>>>>>> > >>>>>>>>>>> On Feb 13, 2014 9:22 PM, "Jay Kreps" <jay.kr...@gmail.com > >> <mailto: > >>>>>>>>>>> jay.kr...@gmail.com><mailto: > >>>>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto: > >>>>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto: > >>>>>>>>>>> jay.kr...@gmail.com>><mailto: > >>>>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto: > >>>>>>>>>>> jay.kr...@gmail.com><mailto:jay.kreps@gmail > >>>>>>>>>>> .com>>> > >>>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Hey Neha, > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> I actually wasn't proposing the name TopicOffsetPosition, that > >>>>>>>>>>> > >>>>>>>>>>> was > >>>>>>>>>>> > >>>>>>>>>>> just a > >>>>>>>>>>> > >>>>>>>>>>> typo. I meant TopicPartitionOffset, and I was just referencing > >>>>>>>>>>> > >>>>>>>>>>> what > >>>>>>>>>>> > >>>>>>>>>>> was > >>>>>>>>>>> > >>>>>>>>>>> in > >>>>>>>>>>> > >>>>>>>>>>> the javadoc. So to restate my proposal without the typo, using > >>>>>>>>>>> > >>>>>>>>>>> just > >>>>>>>>>>> > >>>>>>>>>>> the > >>>>>>>>>>> > >>>>>>>>>>> existing classes (that naming is a separate question): > >>>>>>>>>>> > >>>>>>>>>>> long position(TopicPartition tp) > >>>>>>>>>>> > >>>>>>>>>>> void seek(TopicPartitionOffset p) > >>>>>>>>>>> > >>>>>>>>>>> long committed(TopicPartition tp) > >>>>>>>>>>> > >>>>>>>>>>> void commit(TopicPartitionOffset...); > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> So I may be unclear on committed() (AKA lastCommittedOffset). > Is > >>>>>>>>>>> > >>>>>>>>>>> it returning the in-memory value from the last commit by this > >>>>>>>>>>> > >>>>>>>>>>> consumer, > >>>>>>>>>>> > >>>>>>>>>>> or > >>>>>>>>>>> > >>>>>>>>>>> is > >>>>>>>>>>> > >>>>>>>>>>> it doing a remote fetch, or both? I think you are saying both, > >> i.e. > >>>>>>>>>>> > >>>>>>>>>>> if > >>>>>>>>>>> > >>>>>>>>>>> you > >>>>>>>>>>> > >>>>>>>>>>> have committed on a partition it returns you that value but if > >>>>>>>>>>> > >>>>>>>>>>> you > >>>>>>>>>>> > >>>>>>>>>>> haven't > >>>>>>>>>>> > >>>>>>>>>>> it does a remote lookup? > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> The other argument for making committed batched is that > commit() > >>>>>>>>>>> > >>>>>>>>>>> is batched, so there is symmetry. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> position() and seek() are always in memory changes (I assume) > so > >>>>>>>>>>> > >>>>>>>>>>> there > >>>>>>>>>>> > >>>>>>>>>>> is > >>>>>>>>>>> > >>>>>>>>>>> no need to batch them. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> So taking all that into account what if we revise it to > >>>>>>>>>>> > >>>>>>>>>>> long position(TopicPartition tp) > >>>>>>>>>>> > >>>>>>>>>>> void seek(TopicPartitionOffset p) > >>>>>>>>>>> > >>>>>>>>>>> Map<TopicPartition, Long> committed(TopicPartition tp); > >>>>>>>>>>> > >>>>>>>>>>> void commit(TopicPartitionOffset...); > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> This is not symmetric between position/seek and > commit/committed > >>>>>>>>>>> > >>>>>>>>>>> but > >>>>>>>>>>> > >>>>>>>>>>> it > >>>>>>>>>>> > >>>>>>>>>>> is > >>>>>>>>>>> > >>>>>>>>>>> convenient. Another option for naming would be > >>>>>>>>>>> > >>>>>>>>>>> position/reposition > >>>>>>>>>>> > >>>>>>>>>>> instead > >>>>>>>>>>> > >>>>>>>>>>> of position/seek. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> With respect to the name TopicPartitionOffset, what I was > trying > >>>>>>>>>>> > >>>>>>>>>>> to > >>>>>>>>>>> > >>>>>>>>>>> say > >>>>>>>>>>> > >>>>>>>>>>> is > >>>>>>>>>>> > >>>>>>>>>>> that I recommend we change that to something shorter. I think > >>>>>>>>>>> > >>>>>>>>>>> TopicPosition > >>>>>>>>>>> > >>>>>>>>>>> or ConsumerPosition might be better. Position does not refer to > >>>>>>>>>>> > >>>>>>>>>>> the variables in the object, it refers to the meaning of the > >>>>>>>>>>> > >>>>>>>>>>> object--it represents a position within a topic. The offset > >>>>>>>>>>> > >>>>>>>>>>> field in that object > >>>>>>>>>>> > >>>>>>>>>>> is > >>>>>>>>>>> > >>>>>>>>>>> still called the offset. TopicOffset, PartitionOffset, or > >>>>>>>>>>> > >>>>>>>>>>> ConsumerOffset > >>>>>>>>>>> > >>>>>>>>>>> would all be workable too. Basically I am just objecting to > >>>>>>>>>>> > >>>>>>>>>>> concatenating > >>>>>>>>>>> > >>>>>>>>>>> three nouns together. :-) > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> -Jay > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On Thu, Feb 13, 2014 at 1:54 PM, Neha Narkhede < > >>>>>>>>>>> > >>>>>>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com > ><mailto: > >>>>>>>>>>> neha.narkh...@gmail.com><mailto: > >>>>>>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com > >><mailto: > >>>>>>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com > ><mailto: > >>>>>>>>>>> neha.narkh...@gmail.com>> > >>>>>>>>>>> > >>>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> 2. It returns a list of results. But how can you use the list? > >>>>>>>>>>> > >>>>>>>>>>> The > >>>>>>>>>>> > >>>>>>>>>>> only > >>>>>>>>>>> > >>>>>>>>>>> way > >>>>>>>>>>> > >>>>>>>>>>> to use the list is to make a map of tp=>offset and then look > >>>>>>>>>>> > >>>>>>>>>>> up > >>>>>>>>>>> > >>>>>>>>>>> results > >>>>>>>>>>> > >>>>>>>>>>> in > >>>>>>>>>>> > >>>>>>>>>>> this map (or do a for loop over the list for the partition you > >>>>>>>>>>> > >>>>>>>>>>> want). I > >>>>>>>>>>> > >>>>>>>>>>> recommend that if this is an in-memory check we just do one at > >>>>>>>>>>> > >>>>>>>>>>> a > >>>>>>>>>>> > >>>>>>>>>>> time. > >>>>>>>>>>> > >>>>>>>>>>> E.g. > >>>>>>>>>>> > >>>>>>>>>>> long committedPosition( > >>>>>>>>>>> > >>>>>>>>>>> TopicPosition). > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> This was discussed in the previous emails. There is a choic > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> -- > >>>>>>>>>>> Robert Withers > >>>>>>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com > ><mailto: > >>>>>>>>>>> robert.with...@dish.com><mailto: > >>>>>>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com>> > >>>>>>>>>>> c: 303.919.5856 > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> -- > >>>>>>>>>>> Robert Withers > >>>>>>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com > ><mailto: > >>>>>>>>>>> robert.with...@dish.com> > >>>>>>>>>>> c: 303.919.5856 > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> -- > >>>>>>>>>>> -- Guozhang > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> -- > >>>>>>>>>>> Robert Withers > >>>>>>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com> > >>>>>>>>>>> c: 303.919.5856 > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>> > >>>> > >> > >> > >> > > >