Thank you, Neha, that makes it clear. Really, the aspect of all this that we could really use is a way to do exactly once processing. We are looking at more critical data. What are the latest thoughts on how to achieve exactly once and how might that affect a consumer API?
Thanks, Rob On Feb 27, 2014, at 10:29 AM, Neha Narkhede <neha.narkh...@gmail.com> wrote: > Is > this<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html#seek%28kafka.common.TopicPartitionOffset...%29>what > you are looking for? Basically, I think from the overall feedback, it > looks like code snippets don't seem to work for overall understanding of > the APIs. I plan to update the javadoc with more complete examples that > have been discussed so far on this thread and generally on the mailing list. > > Thanks, > Neha > > > > > On Thu, Feb 27, 2014 at 4:17 AM, Robert Withers > <robert.w.with...@gmail.com>wrote: > >> Neha, >> >> I see how one might wish to implement onPartitionsAssigned and >> onPartitionsRevoked, but I don't have a sense for how I might supply these >> implementations to a running consumer. What would the setup code look like >> to start a high-level consumer with these provided implementations? >> >> thanks, >> Rob >> >> >> On Feb 27, 2014, at 3:48 AM, Neha Narkhede <neha.narkh...@gmail.com> >> wrote: >> >>> Rob, >>> >>> The use of the callbacks is explained in the javadoc here - >>> >> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerRebalanceCallback.html >>> >>> Let me know if it makes sense. The hope is to improve the javadoc so that >>> it is self explanatory. >>> >>> Thanks, >>> Neha >>> >>> >>> On Wed, Feb 26, 2014 at 9:16 AM, Robert Withers >>> <robert.w.with...@gmail.com>wrote: >>> >>>> Neha, what does the use of the RebalanceBeginCallback and >>>> RebalanceEndCallback look like? >>>> >>>> thanks, >>>> Rob >>>> >>>> On Feb 25, 2014, at 3:51 PM, Neha Narkhede <neha.narkh...@gmail.com> >>>> wrote: >>>> >>>>> How do you know n? The whole point is that you need to be able to fetch >>>> the >>>>> end offset. You can't a priori decide you will load 1m messages without >>>>> knowing what is there. >>>>> >>>>> Hmm. I think what you are pointing out is that in the new consumer API, >>>> we >>>>> don't have a way to issue the equivalent of the existing >>>> getOffsetsBefore() >>>>> API. Agree that is a flaw that we should fix. >>>>> >>>>> Will update the docs/wiki with a few use cases that I've collected so >> far >>>>> and see if the API covers those. >>>>> >>>>> I would prefer PartitionsAssigned and PartitionsRevoked as that seems >>>>> clearer to me >>>>> >>>>> Well the RebalanceBeginCallback interface will have >>>> onPartitionsAssigned() >>>>> as the callback. Similarly, the RebalanceEndCallback interface will >> have >>>>> onPartitionsRevoked() as the callback. Makes sense? >>>>> >>>>> Thanks, >>>>> Neha >>>>> >>>>> >>>>> On Tue, Feb 25, 2014 at 2:38 PM, Jay Kreps <jay.kr...@gmail.com> >> wrote: >>>>> >>>>>> 1. I would prefer PartitionsAssigned and PartitionsRevoked as that >> seems >>>>>> clearer to me. >>>>>> >>>>>> -Jay >>>>>> >>>>>> >>>>>> On Tue, Feb 25, 2014 at 10:19 AM, Neha Narkhede < >>>> neha.narkh...@gmail.com >>>>>>> wrote: >>>>>> >>>>>>> Thanks for the reviews so far! There are a few outstanding questions >> - >>>>>>> >>>>>>> 1. It will be good to make the rebalance callbacks forward >> compatible >>>>>> with >>>>>>> Java 8 capabilities. We can change it to PartitionsAssignedCallback >>>>>>> and PartitionsRevokedCallback or RebalanceBeginCallback and >>>>>>> RebalanceEndCallback? >>>>>>> >>>>>>> If there are no objections, I will change it to >> RebalanceBeginCallback >>>>>> and >>>>>>> RebalanceEndCallback. >>>>>>> >>>>>>> 2. The return type for committed() is List<TopicPartitionOffset>. >>>> There >>>>>>> was a suggestion to change it to either be Map<TopicPartition,Long> >> or >>>>>>> Map<TopicPartition, TopicPartitionOffset> >>>>>>> >>>>>>> Do people have feedback on this suggestion? >>>>>>> >>>>>>> >>>>>>> On Tue, Feb 25, 2014 at 9:56 AM, Neha Narkhede < >>>> neha.narkh...@gmail.com >>>>>>>> wrote: >>>>>>> >>>>>>>> Robert, >>>>>>>> >>>>>>>> Are you saying it is possible to get events from the high-level >>>>>>> consumerregarding various state machine changes? For instance, can >> we >>>>>> get a >>>>>>>> notification when a rebalance starts and ends, when a partition is >>>>>>>> assigned/unassigned, when an offset is committed on a partition, >> when >>>> a >>>>>>>> leader changes and so on? I call this OOB traffic, since they are >> not >>>>>>> the >>>>>>>> core messages streaming, but side-band events, yet they are still >>>>>>>> potentially useful to consumers. >>>>>>>> >>>>>>>> In the current proposal, you get notified when the state machine >>>>>> changes >>>>>>>> i.e. before and after a rebalance is triggered. Look at >>>>>>>> ConsumerRebalanceCallback< >>>>>>> >>>>>> >>>> >> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerRebalanceCallback.html >>>>>>>> >>>>>>>> .Leader changes do not count as state machine changes for consumer >>>>>>>> rebalance purposes. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Neha >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Feb 25, 2014 at 9:54 AM, Neha Narkhede < >>>>>> neha.narkh...@gmail.com >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Jay/Robert - >>>>>>>>> >>>>>>>>> >>>>>>>>> I think what Robert is saying is that we need to think through the >>>>>>> offset >>>>>>>>> API to enable "batch processing" of topic data. Think of a process >>>>>> that >>>>>>>>> periodically kicks off to compute a data summary or do a data load >> or >>>>>>>>> something like that. I think what we need to support this is an api >>>> to >>>>>>>>> fetch the last offset from the server for a partition. Something >> like >>>>>>>>> long lastOffset(TopicPartition tp) >>>>>>>>> and for symmetry >>>>>>>>> long firstOffset(TopicPartition tp) >>>>>>>>> >>>>>>>>> Likely this would have to be batched. >>>>>>>>> >>>>>>>>> A fixed range of data load can be done using the existing APIs as >>>>>>>>> follows. This assumes you know the endOffset which can be >>>>>> currentOffset >>>>>>> + n >>>>>>>>> (number of messages in the load) >>>>>>>>> >>>>>>>>> long startOffset = consumer.position(partition); >>>>>>>>> long endOffset = startOffset + n; >>>>>>>>> while(consumer.position(partition) <= endOffset) { >>>>>>>>> List<ConsumerRecord> messages = consumer.poll(timeout, >>>>>>>>> TimeUnit.MILLISECONDS); >>>>>>>>> process(messages, endOffset); // processes messages >>>>>> until >>>>>>>>> endOffset >>>>>>>>> } >>>>>>>>> >>>>>>>>> Does that make sense? >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Feb 25, 2014 at 9:49 AM, Neha Narkhede < >>>>>> neha.narkh...@gmail.com >>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Thanks for the review, Jun. Here are some comments - >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> 1. The using of ellipsis: This may make passing a list of items >> from >>>>>> a >>>>>>>>>> collection to the api a bit harder. Suppose that you have a list >> of >>>>>>>>>> topics >>>>>>>>>> stored in >>>>>>>>>> >>>>>>>>>> ArrayList<String> topics; >>>>>>>>>> >>>>>>>>>> If you want subscribe to all topics in one call, you will have to >>>> do: >>>>>>>>>> >>>>>>>>>> String[] topicArray = new String[topics.size()]; >>>>>>>>>> consumer.subscribe(topics. >>>>>>>>>> toArray(topicArray)); >>>>>>>>>> >>>>>>>>>> A similar argument can be made for arguably the more common use >> case >>>>>> of >>>>>>>>>> subscribing to a single topic as well. In these cases, user is >>>>>> required >>>>>>>>>> to write more >>>>>>>>>> code to create a single item collection and pass it in. Since >>>>>>>>>> subscription is extremely lightweight >>>>>>>>>> invoking it multiple times also seems like a workable solution, >> no? >>>>>>>>>> >>>>>>>>>> 2. It would be good to document that the following apis are >> mutually >>>>>>>>>> exclusive. Also, if the partition level subscription is specified, >>>>>>> there >>>>>>>>>> is >>>>>>>>>> no group management. Finally, unsubscribe() can only be used to >>>>>> cancel >>>>>>>>>> subscriptions with the same pattern. For example, you can't >>>>>> unsubscribe >>>>>>>>>> at >>>>>>>>>> the partition level if the subscription is done at the topic >> level. >>>>>>>>>> >>>>>>>>>> *subscribe*(java.lang.String... topics) >>>>>>>>>> *subscribe*(java.lang.String topic, int... partitions) >>>>>>>>>> >>>>>>>>>> Makes sense. Made the suggested improvements to the docs< >>>>>>> >>>>>> >>>> >> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/Consumer.html#subscribe%28java.lang.String...%29 >>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> 3.commit(): The following comment in the doc should probably say >>>>>>> "commit >>>>>>>>>> offsets for partitions assigned to this consumer". >>>>>>>>>> >>>>>>>>>> If no partitions are specified, commits offsets for the subscribed >>>>>>> list >>>>>>>>>> of >>>>>>>>>> topics and partitions to Kafka. >>>>>>>>>> >>>>>>>>>> Could you give more context on this suggestion? Here is the entire >>>>>> doc >>>>>>> - >>>>>>>>>> >>>>>>>>>> Synchronously commits the specified offsets for the specified list >>>> of >>>>>>>>>> topics and partitions to *Kafka*. If no partitions are specified, >>>>>>>>>> commits offsets for the subscribed list of topics and partitions. >>>>>>>>>> >>>>>>>>>> The hope is to convey that if no partitions are specified, offsets >>>>>> will >>>>>>>>>> be committed for the subscribed list of partitions. One >> improvement >>>>>>> could >>>>>>>>>> be to >>>>>>>>>> explicitly state that the offsets returned on the last poll will >> be >>>>>>>>>> committed. I updated this to - >>>>>>>>>> >>>>>>>>>> Synchronously commits the specified offsets for the specified list >>>> of >>>>>>>>>> topics and partitions to *Kafka*. If no offsets are specified, >>>>>> commits >>>>>>>>>> offsets returned on the last {@link #poll(long, TimeUnit) poll()} >>>> for >>>>>>>>>> the subscribed list of topics and partitions. >>>>>>>>>> >>>>>>>>>> 4. There is inconsistency in specifying partitions. Sometimes we >> use >>>>>>>>>> TopicPartition and some other times we use String and int (see >>>>>>>>>> examples below). >>>>>>>>>> >>>>>>>>>> void onPartitionsAssigned(Consumer consumer, >>>>>>>>>> TopicPartition...partitions) >>>>>>>>>> >>>>>>>>>> public void *subscribe*(java.lang.String topic, int... partitions) >>>>>>>>>> >>>>>>>>>> Yes, this was discussed previously. I think generally the >> consensus >>>>>>>>>> seems to be to use the higher level >>>>>>>>>> classes everywhere. Made those changes. >>>>>>>>>> >>>>>>>>>> What's the use case of position()? Isn't that just the >> nextOffset() >>>>>> on >>>>>>>>>> the >>>>>>>>>> last message returned from poll()? >>>>>>>>>> >>>>>>>>>> Yes, except in the case where a rebalance is triggered and poll() >> is >>>>>>> not >>>>>>>>>> yet invoked. Here, you would use position() to get the new fetch >>>>>>> position >>>>>>>>>> for the specific partition. Even if this is not a common use case, >>>>>> IMO >>>>>>> it >>>>>>>>>> is much easier to use position() to get the fetch offset than >>>>>> invoking >>>>>>>>>> nextOffset() on the last message. This also keeps the APIs >>>> symmetric, >>>>>>> which >>>>>>>>>> is nice. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Mon, Feb 24, 2014 at 7:06 PM, Withers, Robert < >>>>>>>>>> robert.with...@dish.com> wrote: >>>>>>>>>> >>>>>>>>>>> That's wonderful. Thanks for kafka. >>>>>>>>>>> >>>>>>>>>>> Rob >>>>>>>>>>> >>>>>>>>>>> On Feb 24, 2014, at 9:58 AM, Guozhang Wang <wangg...@gmail.com >>>>>>> <mailto: >>>>>>>>>>> wangg...@gmail.com>> wrote: >>>>>>>>>>> >>>>>>>>>>> Hi Robert, >>>>>>>>>>> >>>>>>>>>>> Yes, you can check out the callback functions in the new API >>>>>>>>>>> >>>>>>>>>>> onPartitionDesigned >>>>>>>>>>> onPartitionAssigned >>>>>>>>>>> >>>>>>>>>>> and see if they meet your needs. >>>>>>>>>>> >>>>>>>>>>> Guozhang >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert < >>>>>>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com>>wrote: >>>>>>>>>>> >>>>>>>>>>> Jun, >>>>>>>>>>> >>>>>>>>>>> Are you saying it is possible to get events from the high-level >>>>>>> consumer >>>>>>>>>>> regarding various state machine changes? For instance, can we >> get >>>> a >>>>>>>>>>> notification when a rebalance starts and ends, when a partition >> is >>>>>>>>>>> assigned/unassigned, when an offset is committed on a partition, >>>>>> when >>>>>>> a >>>>>>>>>>> leader changes and so on? I call this OOB traffic, since they >> are >>>>>> not >>>>>>>>>>> the >>>>>>>>>>> core messages streaming, but side-band events, yet they are still >>>>>>>>>>> potentially useful to consumers. >>>>>>>>>>> >>>>>>>>>>> Thank you, >>>>>>>>>>> Robert >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Robert Withers >>>>>>>>>>> Staff Analyst/Developer >>>>>>>>>>> o: (720) 514-8963 >>>>>>>>>>> c: (571) 262-1873 >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -----Original Message----- >>>>>>>>>>> From: Jun Rao [mailto:jun...@gmail.com] >>>>>>>>>>> Sent: Sunday, February 23, 2014 4:19 PM >>>>>>>>>>> To: users@kafka.apache.org<mailto:users@kafka.apache.org> >>>>>>>>>>> Subject: Re: New Consumer API discussion >>>>>>>>>>> >>>>>>>>>>> Robert, >>>>>>>>>>> >>>>>>>>>>> For the push orient api, you can potentially implement your own >>>>>>>>>>> MessageHandler with those methods. In the main loop of our new >>>>>>> consumer >>>>>>>>>>> api, you can just call those methods based on the events you get. >>>>>>>>>>> >>>>>>>>>>> Also, we already have an api to get the first and the last offset >>>>>> of a >>>>>>>>>>> partition (getOffsetBefore). >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> >>>>>>>>>>> Jun >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert >>>>>>>>>>> <robert.with...@dish.com<mailto:robert.with...@dish.com>>wrote: >>>>>>>>>>> >>>>>>>>>>> This is a good idea, too. I would modify it to include stream >>>>>>>>>>> marking, then you can have: >>>>>>>>>>> >>>>>>>>>>> long end = consumer.lastOffset(tp); >>>>>>>>>>> consumer.setMark(end); >>>>>>>>>>> while(consumer.beforeMark()) { >>>>>>>>>>> process(consumer.pollToMark()); >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> or >>>>>>>>>>> >>>>>>>>>>> long end = consumer.lastOffset(tp); >>>>>>>>>>> consumer.setMark(end); >>>>>>>>>>> for(Object msg : consumer.iteratorToMark()) { >>>>>>>>>>> process(msg); >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> I actually have 4 suggestions, then: >>>>>>>>>>> >>>>>>>>>>> * pull: stream marking >>>>>>>>>>> * pull: finite streams, bound by time range (up-to-now, >>>> yesterday) >>>>>>> or >>>>>>>>>>> offset >>>>>>>>>>> * pull: async api >>>>>>>>>>> * push: KafkaMessageSource, for a push model, with msg and OOB >>>>>>> events. >>>>>>>>>>> Build one in either individual or chunk mode and have a listener >>>> for >>>>>>>>>>> each msg or a listener for a chunk of msgs. Make it composable >> and >>>>>>>>>>> policy driven (chunked, range, commitOffsets policy, retry >> policy, >>>>>>>>>>> transactional) >>>>>>>>>>> >>>>>>>>>>> Thank you, >>>>>>>>>>> Robert >>>>>>>>>>> >>>>>>>>>>> On Feb 22, 2014, at 11:21 AM, Jay Kreps <jay.kr...@gmail.com >>>>>> <mailto: >>>>>>>>>>> jay.kr...@gmail.com><mailto: >>>>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>>> wrote: >>>>>>>>>>> >>>>>>>>>>> I think what Robert is saying is that we need to think through >> the >>>>>>>>>>> offset API to enable "batch processing" of topic data. Think of a >>>>>>>>>>> process that periodically kicks off to compute a data summary or >> do >>>>>> a >>>>>>>>>>> data load or something like that. I think what we need to support >>>>>> this >>>>>>>>>>> is an api to fetch the last offset from the server for a >> partition. >>>>>>>>>>> Something like >>>>>>>>>>> long lastOffset(TopicPartition tp) >>>>>>>>>>> and for symmetry >>>>>>>>>>> long firstOffset(TopicPartition tp) >>>>>>>>>>> >>>>>>>>>>> Likely this would have to be batched. Essentially we should add >>>> this >>>>>>>>>>> use case to our set of code examples to write and think through. >>>>>>>>>>> >>>>>>>>>>> The usage would be something like >>>>>>>>>>> >>>>>>>>>>> long end = consumer.lastOffset(tp); >>>>>>>>>>> while(consumer.position < end) >>>>>>>>>>> process(consumer.poll()); >>>>>>>>>>> >>>>>>>>>>> -Jay >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert >>>>>>>>>>> <robert.with...@dish.com<mailto:robert.with...@dish.com> >>>>>>>>>>> <mailto:robert.with...@dish.com>>wrote: >>>>>>>>>>> >>>>>>>>>>> Jun, >>>>>>>>>>> >>>>>>>>>>> I was originally thinking a non-blocking read from a distributed >>>>>>>>>>> stream should distinguish between "no local messages, but a fetch >>>> is >>>>>>>>>>> occurring" >>>>>>>>>>> versus "you have drained the stream". The reason this may be >>>>>> valuable >>>>>>>>>>> to me is so I can write consumers that read all known traffic >> then >>>>>>>>>>> terminate. >>>>>>>>>>> You caused me to reconsider and I think I am conflating 2 things. >>>>>> One >>>>>>>>>>> is a sync/async api while the other is whether to have an >> infinite >>>>>> or >>>>>>>>>>> finite stream. Is it possible to build a finite KafkaStream on a >>>>>>>>>>> range of messages? >>>>>>>>>>> >>>>>>>>>>> Perhaps a Simple Consumer would do just fine and then I could >> start >>>>>>>>>>> off getting the writeOffset from zookeeper and tell it to read a >>>>>>>>>>> specified range per partition. I've done this and forked a >> simple >>>>>>>>>>> consumer runnable for each partition, for one of our analyzers. >>>> The >>>>>>>>>>> great thing about the high-level consumer is that rebalance, so I >>>>>> can >>>>>>>>>>> fork however many stream readers I want and you just figure it >> out >>>>>> for >>>>>>>>>>> me. In that way you offer us the control over the resource >>>>>>>>>>> consumption within a pull model. This is best to regulate >> message >>>>>>>>>>> pressure, they say. >>>>>>>>>>> >>>>>>>>>>> Combining that high-level rebalance ability with a ranged >> partition >>>>>>>>>>> drain could be really nice...build the stream with an ending >>>>>> position >>>>>>>>>>> and it is a finite stream, but retain the high-level rebalance. >>>>>> With >>>>>>>>>>> a finite stream, you would know the difference of the 2 async >>>>>>>>>>> scenarios: fetch-in-progress versus end-of-stream. With an >>>> infinite >>>>>>>>>>> stream, you never get end-of-stream. >>>>>>>>>>> >>>>>>>>>>> Aside from a high-level consumer over a finite range within each >>>>>>>>>>> partition, the other feature I can think of is more complicated. >> A >>>>>>>>>>> high-level consumer has state machine changes that the client >>>> cannot >>>>>>>>>>> access, to my knowledge. Our use of kafka has us invoke a >> message >>>>>>>>>>> handler with each message we consumer from the KafkaStream, so we >>>>>>>>>>> convert a pull-model to a push-model. Including the idea of >>>>>> receiving >>>>>>>>>>> notifications from state machine changes, what would be really >> nice >>>>>> is >>>>>>>>>>> to have a KafkaMessageSource, that is an eventful push model. If >>>> it >>>>>>>>>>> were thread-safe, then we could register listeners for various >>>>>> events: >>>>>>>>>>> >>>>>>>>>>> * opening-stream >>>>>>>>>>> * closing-stream >>>>>>>>>>> * message-arrived >>>>>>>>>>> * end-of-stream/no-more-messages-in-partition (for finite >>>> streams) >>>>>>>>>>> * rebalance started >>>>>>>>>>> * partition assigned >>>>>>>>>>> * partition unassigned >>>>>>>>>>> * rebalance finished >>>>>>>>>>> * partition-offset-committed >>>>>>>>>>> >>>>>>>>>>> Perhaps that is just our use, but instead of a pull-oriented >>>>>>>>>>> KafkaStream, is there any sense in your providing a push-oriented >>>>>>>>>>> KafkaMessageSource publishing OOB messages? >>>>>>>>>>> >>>>>>>>>>> thank you, >>>>>>>>>>> Robert >>>>>>>>>>> >>>>>>>>>>> On Feb 21, 2014, at 5:59 PM, Jun Rao <jun...@gmail.com<mailto: >>>>>>>>>>> jun...@gmail.com><mailto: >>>>>>>>>>> jun...@gmail.com<mailto:jun...@gmail.com>><mailto: >>>>>>>>>>> jun...@gmail.com<mailto:jun...@gmail.com><mailto: >> jun...@gmail.com >>>>>>>>> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> Robert, >>>>>>>>>>> >>>>>>>>>>> Could you explain why you want to distinguish btw >>>>>>>>>>> FetchingInProgressException and NoMessagePendingException? The >>>>>>>>>>> nextMsgs() method that you want is exactly what poll() does. >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> >>>>>>>>>>> Jun >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Wed, Feb 19, 2014 at 8:45 AM, Withers, Robert >>>>>>>>>>> <robert.with...@dish.com<mailto:robert.with...@dish.com> >> <mailto: >>>>>>>>>>> robert.with...@dish.com> >>>>>>>>>>> <mailto:robert.with...@dish.com>>wrote: >>>>>>>>>>> >>>>>>>>>>> I am not clear on why the consumer stream should be positionable, >>>>>>>>>>> especially if it is limited to the in-memory fetched messages. >>>>>> Could >>>>>>>>>>> someone explain to me, please? I really like the idea of >>>> committing >>>>>>>>>>> the offset specifically on those partitions with changed read >>>>>> offsets, >>>>>>>>>>> only. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> 2 items I would like to see added to the KafkaStream are: >>>>>>>>>>> >>>>>>>>>>> * a non-blocking next(), throws several exceptions >>>>>>>>>>> (FetchingInProgressException and a NoMessagePendingException or >>>>>>>>>>> something) to differentiate between fetching or no messages left. >>>>>>>>>>> >>>>>>>>>>> * A nextMsgs() method which returns all locally available >>>>>>>>>>> messages >>>>>>>>>>> and kicks off a fetch for the next chunk. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> If you are trying to add transactional features, then formally >>>>>> define >>>>>>>>>>> a DTP capability and pull in other server frameworks to share the >>>>>>>>>>> implementation. Should it be XA/Open? How about a new peer2peer >>>>>> DTP >>>>>>>>>>> protocol? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Thank you, >>>>>>>>>>> >>>>>>>>>>> Robert >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Robert Withers >>>>>>>>>>> >>>>>>>>>>> Staff Analyst/Developer >>>>>>>>>>> >>>>>>>>>>> o: (720) 514-8963 >>>>>>>>>>> >>>>>>>>>>> c: (571) 262-1873 >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -----Original Message----- >>>>>>>>>>> From: Jay Kreps [mailto:jay.kr...@gmail.com] >>>>>>>>>>> Sent: Sunday, February 16, 2014 10:13 AM >>>>>>>>>>> To: users@kafka.apache.org<mailto:users@kafka.apache.org >>> <mailto: >>>>>>>>>>> users@kafka.apache.org><mailto: >>>>>>>>>>> users@kafka.apache.org<mailto:users@kafka.apache.org>> >>>>>>>>>>> Subject: Re: New Consumer API discussion >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> +1 I think those are good. It is a little weird that changing the >>>>>>>>>>> +fetch >>>>>>>>>>> >>>>>>>>>>> point is not batched but changing the commit point is, but I >>>> suppose >>>>>>>>>>> there is no helping that. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -Jay >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Sat, Feb 15, 2014 at 7:52 AM, Neha Narkhede >>>>>>>>>>> <neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com> >> <mailto: >>>>>>>>>>> neha.narkh...@gmail.com> >>>>>>>>>>> <mailto:neha.narkh...@gmail.com> >>>>>>>>>>> <mailto:neha.narkh...@gmail.com>>wrote: >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Jay, >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> That makes sense. position/seek deal with changing the consumers >>>>>>>>>>> >>>>>>>>>>> in-memory data, so there is no remote rpc there. For some >> reason, I >>>>>>>>>>> >>>>>>>>>>> got committed and seek mixed up in my head at that time :) >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> So we still end up with >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> long position(TopicPartition tp) >>>>>>>>>>> >>>>>>>>>>> void seek(TopicPartitionOffset p) >>>>>>>>>>> >>>>>>>>>>> Map<TopicPartition, Long> committed(TopicPartition tp); >>>>>>>>>>> >>>>>>>>>>> void commit(TopicPartitionOffset...); >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> >>>>>>>>>>> Neha >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Friday, February 14, 2014, Jay Kreps <jay.kr...@gmail.com >>>>>> <mailto: >>>>>>>>>>> jay.kr...@gmail.com><mailto: >>>>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto: >>>>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto: >>>>>>>>>>> jay.kr...@gmail.com>><mailto: >>>>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto: >>>>>>>>>>> jay.kr...@gmail.com><mailto:jay.kreps@gmail >>>>>>>>>>> .com>>> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Oh, interesting. So I am assuming the following implementation: >>>>>>>>>>> >>>>>>>>>>> 1. We have an in-memory fetch position which controls the next >>>> fetch >>>>>>>>>>> >>>>>>>>>>> offset. >>>>>>>>>>> >>>>>>>>>>> 2. Changing this has no effect until you poll again at which >> point >>>>>>>>>>> >>>>>>>>>>> your fetch request will be from the newly specified offset 3. We >>>>>>>>>>> >>>>>>>>>>> then have an in-memory but also remotely stored committed offset. >>>>>>>>>>> >>>>>>>>>>> 4. Calling commit has the effect of saving the fetch position as >>>>>>>>>>> >>>>>>>>>>> both the in memory committed position and in the remote store 5. >>>>>>>>>>> >>>>>>>>>>> Auto-commit is the same as periodically calling commit on all >>>>>>>>>>> >>>>>>>>>>> positions. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> So batching on commit as well as getting the committed position >>>>>>>>>>> >>>>>>>>>>> makes sense, but batching the fetch position wouldn't, right? I >>>>>>>>>>> >>>>>>>>>>> think you are actually thinking of a different approach. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -Jay >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thu, Feb 13, 2014 at 10:40 PM, Neha Narkhede >>>>>>>>>>> >>>>>>>>>>> <neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto: >>>>>>>>>>> neha.narkh...@gmail.com><mailto: >>>>>>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>> >>>>>>>>>>> >>>>>>>>>>> <javascript:;> >>>>>>>>>>> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> I think you are saying both, i.e. if you have committed on a >>>>>>>>>>> >>>>>>>>>>> partition it returns you that value but if you >>>>>>>>>>> >>>>>>>>>>> haven't >>>>>>>>>>> >>>>>>>>>>> it does a remote lookup? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Correct. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> The other argument for making committed batched is that commit() >>>>>>>>>>> >>>>>>>>>>> is batched, so there is symmetry. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> position() and seek() are always in memory changes (I assume) so >>>>>>>>>>> >>>>>>>>>>> there >>>>>>>>>>> >>>>>>>>>>> is >>>>>>>>>>> >>>>>>>>>>> no need to batch them. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> I'm not as sure as you are about that assumption being true. >>>>>>>>>>> >>>>>>>>>>> Basically >>>>>>>>>>> >>>>>>>>>>> in >>>>>>>>>>> >>>>>>>>>>> my example above, the batching argument for committed() also >>>>>>>>>>> >>>>>>>>>>> applies to >>>>>>>>>>> >>>>>>>>>>> position() since one purpose of fetching a partition's offset is >>>>>>>>>>> >>>>>>>>>>> to use >>>>>>>>>>> >>>>>>>>>>> it >>>>>>>>>>> >>>>>>>>>>> to set the position of the consumer to that offset. Since that >>>>>>>>>>> >>>>>>>>>>> might >>>>>>>>>>> >>>>>>>>>>> lead >>>>>>>>>>> >>>>>>>>>>> to a remote OffsetRequest call, I think we probably would be >>>>>>>>>>> >>>>>>>>>>> better off batching it. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Another option for naming would be position/reposition instead of >>>>>>>>>>> >>>>>>>>>>> position/seek. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> I think position/seek is better since it aligns with Java file >>>> APIs. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> I also think your suggestion about ConsumerPosition makes sense. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> >>>>>>>>>>> Neha >>>>>>>>>>> >>>>>>>>>>> On Feb 13, 2014 9:22 PM, "Jay Kreps" <jay.kr...@gmail.com >> <mailto: >>>>>>>>>>> jay.kr...@gmail.com><mailto: >>>>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto: >>>>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto: >>>>>>>>>>> jay.kr...@gmail.com>><mailto: >>>>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto: >>>>>>>>>>> jay.kr...@gmail.com><mailto:jay.kreps@gmail >>>>>>>>>>> .com>>> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Hey Neha, >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> I actually wasn't proposing the name TopicOffsetPosition, that >>>>>>>>>>> >>>>>>>>>>> was >>>>>>>>>>> >>>>>>>>>>> just a >>>>>>>>>>> >>>>>>>>>>> typo. I meant TopicPartitionOffset, and I was just referencing >>>>>>>>>>> >>>>>>>>>>> what >>>>>>>>>>> >>>>>>>>>>> was >>>>>>>>>>> >>>>>>>>>>> in >>>>>>>>>>> >>>>>>>>>>> the javadoc. So to restate my proposal without the typo, using >>>>>>>>>>> >>>>>>>>>>> just >>>>>>>>>>> >>>>>>>>>>> the >>>>>>>>>>> >>>>>>>>>>> existing classes (that naming is a separate question): >>>>>>>>>>> >>>>>>>>>>> long position(TopicPartition tp) >>>>>>>>>>> >>>>>>>>>>> void seek(TopicPartitionOffset p) >>>>>>>>>>> >>>>>>>>>>> long committed(TopicPartition tp) >>>>>>>>>>> >>>>>>>>>>> void commit(TopicPartitionOffset...); >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> So I may be unclear on committed() (AKA lastCommittedOffset). Is >>>>>>>>>>> >>>>>>>>>>> it returning the in-memory value from the last commit by this >>>>>>>>>>> >>>>>>>>>>> consumer, >>>>>>>>>>> >>>>>>>>>>> or >>>>>>>>>>> >>>>>>>>>>> is >>>>>>>>>>> >>>>>>>>>>> it doing a remote fetch, or both? I think you are saying both, >> i.e. >>>>>>>>>>> >>>>>>>>>>> if >>>>>>>>>>> >>>>>>>>>>> you >>>>>>>>>>> >>>>>>>>>>> have committed on a partition it returns you that value but if >>>>>>>>>>> >>>>>>>>>>> you >>>>>>>>>>> >>>>>>>>>>> haven't >>>>>>>>>>> >>>>>>>>>>> it does a remote lookup? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> The other argument for making committed batched is that commit() >>>>>>>>>>> >>>>>>>>>>> is batched, so there is symmetry. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> position() and seek() are always in memory changes (I assume) so >>>>>>>>>>> >>>>>>>>>>> there >>>>>>>>>>> >>>>>>>>>>> is >>>>>>>>>>> >>>>>>>>>>> no need to batch them. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> So taking all that into account what if we revise it to >>>>>>>>>>> >>>>>>>>>>> long position(TopicPartition tp) >>>>>>>>>>> >>>>>>>>>>> void seek(TopicPartitionOffset p) >>>>>>>>>>> >>>>>>>>>>> Map<TopicPartition, Long> committed(TopicPartition tp); >>>>>>>>>>> >>>>>>>>>>> void commit(TopicPartitionOffset...); >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> This is not symmetric between position/seek and commit/committed >>>>>>>>>>> >>>>>>>>>>> but >>>>>>>>>>> >>>>>>>>>>> it >>>>>>>>>>> >>>>>>>>>>> is >>>>>>>>>>> >>>>>>>>>>> convenient. Another option for naming would be >>>>>>>>>>> >>>>>>>>>>> position/reposition >>>>>>>>>>> >>>>>>>>>>> instead >>>>>>>>>>> >>>>>>>>>>> of position/seek. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> With respect to the name TopicPartitionOffset, what I was trying >>>>>>>>>>> >>>>>>>>>>> to >>>>>>>>>>> >>>>>>>>>>> say >>>>>>>>>>> >>>>>>>>>>> is >>>>>>>>>>> >>>>>>>>>>> that I recommend we change that to something shorter. I think >>>>>>>>>>> >>>>>>>>>>> TopicPosition >>>>>>>>>>> >>>>>>>>>>> or ConsumerPosition might be better. Position does not refer to >>>>>>>>>>> >>>>>>>>>>> the variables in the object, it refers to the meaning of the >>>>>>>>>>> >>>>>>>>>>> object--it represents a position within a topic. The offset >>>>>>>>>>> >>>>>>>>>>> field in that object >>>>>>>>>>> >>>>>>>>>>> is >>>>>>>>>>> >>>>>>>>>>> still called the offset. TopicOffset, PartitionOffset, or >>>>>>>>>>> >>>>>>>>>>> ConsumerOffset >>>>>>>>>>> >>>>>>>>>>> would all be workable too. Basically I am just objecting to >>>>>>>>>>> >>>>>>>>>>> concatenating >>>>>>>>>>> >>>>>>>>>>> three nouns together. :-) >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -Jay >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thu, Feb 13, 2014 at 1:54 PM, Neha Narkhede < >>>>>>>>>>> >>>>>>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto: >>>>>>>>>>> neha.narkh...@gmail.com><mailto: >>>>>>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>><mailto: >>>>>>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto: >>>>>>>>>>> neha.narkh...@gmail.com>> >>>>>>>>>>> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> 2. It returns a list of results. But how can you use the list? >>>>>>>>>>> >>>>>>>>>>> The >>>>>>>>>>> >>>>>>>>>>> only >>>>>>>>>>> >>>>>>>>>>> way >>>>>>>>>>> >>>>>>>>>>> to use the list is to make a map of tp=>offset and then look >>>>>>>>>>> >>>>>>>>>>> up >>>>>>>>>>> >>>>>>>>>>> results >>>>>>>>>>> >>>>>>>>>>> in >>>>>>>>>>> >>>>>>>>>>> this map (or do a for loop over the list for the partition you >>>>>>>>>>> >>>>>>>>>>> want). I >>>>>>>>>>> >>>>>>>>>>> recommend that if this is an in-memory check we just do one at >>>>>>>>>>> >>>>>>>>>>> a >>>>>>>>>>> >>>>>>>>>>> time. >>>>>>>>>>> >>>>>>>>>>> E.g. >>>>>>>>>>> >>>>>>>>>>> long committedPosition( >>>>>>>>>>> >>>>>>>>>>> TopicPosition). >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> This was discussed in the previous emails. There is a choic >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Robert Withers >>>>>>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com><mailto: >>>>>>>>>>> robert.with...@dish.com><mailto: >>>>>>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com>> >>>>>>>>>>> c: 303.919.5856 >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Robert Withers >>>>>>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com><mailto: >>>>>>>>>>> robert.with...@dish.com> >>>>>>>>>>> c: 303.919.5856 >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> -- Guozhang >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Robert Withers >>>>>>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com> >>>>>>>>>>> c: 303.919.5856 >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>> >>>> >> >> >>
smime.p7s
Description: S/MIME cryptographic signature