Neha, what does the use of the RebalanceBeginCallback and RebalanceEndCallback look like?
thanks, Rob On Feb 25, 2014, at 3:51 PM, Neha Narkhede <neha.narkh...@gmail.com> wrote: > How do you know n? The whole point is that you need to be able to fetch the > end offset. You can't a priori decide you will load 1m messages without > knowing what is there. > > Hmm. I think what you are pointing out is that in the new consumer API, we > don't have a way to issue the equivalent of the existing getOffsetsBefore() > API. Agree that is a flaw that we should fix. > > Will update the docs/wiki with a few use cases that I've collected so far > and see if the API covers those. > > I would prefer PartitionsAssigned and PartitionsRevoked as that seems > clearer to me > > Well the RebalanceBeginCallback interface will have onPartitionsAssigned() > as the callback. Similarly, the RebalanceEndCallback interface will have > onPartitionsRevoked() as the callback. Makes sense? > > Thanks, > Neha > > > On Tue, Feb 25, 2014 at 2:38 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > >> 1. I would prefer PartitionsAssigned and PartitionsRevoked as that seems >> clearer to me. >> >> -Jay >> >> >> On Tue, Feb 25, 2014 at 10:19 AM, Neha Narkhede <neha.narkh...@gmail.com >>> wrote: >> >>> Thanks for the reviews so far! There are a few outstanding questions - >>> >>> 1. It will be good to make the rebalance callbacks forward compatible >> with >>> Java 8 capabilities. We can change it to PartitionsAssignedCallback >>> and PartitionsRevokedCallback or RebalanceBeginCallback and >>> RebalanceEndCallback? >>> >>> If there are no objections, I will change it to RebalanceBeginCallback >> and >>> RebalanceEndCallback. >>> >>> 2. The return type for committed() is List<TopicPartitionOffset>. There >>> was a suggestion to change it to either be Map<TopicPartition,Long> or >>> Map<TopicPartition, TopicPartitionOffset> >>> >>> Do people have feedback on this suggestion? >>> >>> >>> On Tue, Feb 25, 2014 at 9:56 AM, Neha Narkhede <neha.narkh...@gmail.com >>>> wrote: >>> >>>> Robert, >>>> >>>> Are you saying it is possible to get events from the high-level >>> consumerregarding various state machine changes? For instance, can we >> get a >>>> notification when a rebalance starts and ends, when a partition is >>>> assigned/unassigned, when an offset is committed on a partition, when a >>>> leader changes and so on? I call this OOB traffic, since they are not >>> the >>>> core messages streaming, but side-band events, yet they are still >>>> potentially useful to consumers. >>>> >>>> In the current proposal, you get notified when the state machine >> changes >>>> i.e. before and after a rebalance is triggered. Look at >>>> ConsumerRebalanceCallback< >>> >> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerRebalanceCallback.html >>>> >>>> .Leader changes do not count as state machine changes for consumer >>>> rebalance purposes. >>>> >>>> Thanks, >>>> Neha >>>> >>>> >>>> On Tue, Feb 25, 2014 at 9:54 AM, Neha Narkhede < >> neha.narkh...@gmail.com >>>> wrote: >>>> >>>>> Jay/Robert - >>>>> >>>>> >>>>> I think what Robert is saying is that we need to think through the >>> offset >>>>> API to enable "batch processing" of topic data. Think of a process >> that >>>>> periodically kicks off to compute a data summary or do a data load or >>>>> something like that. I think what we need to support this is an api to >>>>> fetch the last offset from the server for a partition. Something like >>>>> long lastOffset(TopicPartition tp) >>>>> and for symmetry >>>>> long firstOffset(TopicPartition tp) >>>>> >>>>> Likely this would have to be batched. >>>>> >>>>> A fixed range of data load can be done using the existing APIs as >>>>> follows. This assumes you know the endOffset which can be >> currentOffset >>> + n >>>>> (number of messages in the load) >>>>> >>>>> long startOffset = consumer.position(partition); >>>>> long endOffset = startOffset + n; >>>>> while(consumer.position(partition) <= endOffset) { >>>>> List<ConsumerRecord> messages = consumer.poll(timeout, >>>>> TimeUnit.MILLISECONDS); >>>>> process(messages, endOffset); // processes messages >> until >>>>> endOffset >>>>> } >>>>> >>>>> Does that make sense? >>>>> >>>>> >>>>> On Tue, Feb 25, 2014 at 9:49 AM, Neha Narkhede < >> neha.narkh...@gmail.com >>>> wrote: >>>>> >>>>>> Thanks for the review, Jun. Here are some comments - >>>>>> >>>>>> >>>>>> 1. The using of ellipsis: This may make passing a list of items from >> a >>>>>> collection to the api a bit harder. Suppose that you have a list of >>>>>> topics >>>>>> stored in >>>>>> >>>>>> ArrayList<String> topics; >>>>>> >>>>>> If you want subscribe to all topics in one call, you will have to do: >>>>>> >>>>>> String[] topicArray = new String[topics.size()]; >>>>>> consumer.subscribe(topics. >>>>>> toArray(topicArray)); >>>>>> >>>>>> A similar argument can be made for arguably the more common use case >> of >>>>>> subscribing to a single topic as well. In these cases, user is >> required >>>>>> to write more >>>>>> code to create a single item collection and pass it in. Since >>>>>> subscription is extremely lightweight >>>>>> invoking it multiple times also seems like a workable solution, no? >>>>>> >>>>>> 2. It would be good to document that the following apis are mutually >>>>>> exclusive. Also, if the partition level subscription is specified, >>> there >>>>>> is >>>>>> no group management. Finally, unsubscribe() can only be used to >> cancel >>>>>> subscriptions with the same pattern. For example, you can't >> unsubscribe >>>>>> at >>>>>> the partition level if the subscription is done at the topic level. >>>>>> >>>>>> *subscribe*(java.lang.String... topics) >>>>>> *subscribe*(java.lang.String topic, int... partitions) >>>>>> >>>>>> Makes sense. Made the suggested improvements to the docs< >>> >> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/Consumer.html#subscribe%28java.lang.String...%29 >>>> >>>>>> >>>>>> >>>>>> 3.commit(): The following comment in the doc should probably say >>> "commit >>>>>> offsets for partitions assigned to this consumer". >>>>>> >>>>>> If no partitions are specified, commits offsets for the subscribed >>> list >>>>>> of >>>>>> topics and partitions to Kafka. >>>>>> >>>>>> Could you give more context on this suggestion? Here is the entire >> doc >>> - >>>>>> >>>>>> Synchronously commits the specified offsets for the specified list of >>>>>> topics and partitions to *Kafka*. If no partitions are specified, >>>>>> commits offsets for the subscribed list of topics and partitions. >>>>>> >>>>>> The hope is to convey that if no partitions are specified, offsets >> will >>>>>> be committed for the subscribed list of partitions. One improvement >>> could >>>>>> be to >>>>>> explicitly state that the offsets returned on the last poll will be >>>>>> committed. I updated this to - >>>>>> >>>>>> Synchronously commits the specified offsets for the specified list of >>>>>> topics and partitions to *Kafka*. If no offsets are specified, >> commits >>>>>> offsets returned on the last {@link #poll(long, TimeUnit) poll()} for >>>>>> the subscribed list of topics and partitions. >>>>>> >>>>>> 4. There is inconsistency in specifying partitions. Sometimes we use >>>>>> TopicPartition and some other times we use String and int (see >>>>>> examples below). >>>>>> >>>>>> void onPartitionsAssigned(Consumer consumer, >>>>>> TopicPartition...partitions) >>>>>> >>>>>> public void *subscribe*(java.lang.String topic, int... partitions) >>>>>> >>>>>> Yes, this was discussed previously. I think generally the consensus >>>>>> seems to be to use the higher level >>>>>> classes everywhere. Made those changes. >>>>>> >>>>>> What's the use case of position()? Isn't that just the nextOffset() >> on >>>>>> the >>>>>> last message returned from poll()? >>>>>> >>>>>> Yes, except in the case where a rebalance is triggered and poll() is >>> not >>>>>> yet invoked. Here, you would use position() to get the new fetch >>> position >>>>>> for the specific partition. Even if this is not a common use case, >> IMO >>> it >>>>>> is much easier to use position() to get the fetch offset than >> invoking >>>>>> nextOffset() on the last message. This also keeps the APIs symmetric, >>> which >>>>>> is nice. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Mon, Feb 24, 2014 at 7:06 PM, Withers, Robert < >>>>>> robert.with...@dish.com> wrote: >>>>>> >>>>>>> That's wonderful. Thanks for kafka. >>>>>>> >>>>>>> Rob >>>>>>> >>>>>>> On Feb 24, 2014, at 9:58 AM, Guozhang Wang <wangg...@gmail.com >>> <mailto: >>>>>>> wangg...@gmail.com>> wrote: >>>>>>> >>>>>>> Hi Robert, >>>>>>> >>>>>>> Yes, you can check out the callback functions in the new API >>>>>>> >>>>>>> onPartitionDesigned >>>>>>> onPartitionAssigned >>>>>>> >>>>>>> and see if they meet your needs. >>>>>>> >>>>>>> Guozhang >>>>>>> >>>>>>> >>>>>>> On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert < >>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com>>wrote: >>>>>>> >>>>>>> Jun, >>>>>>> >>>>>>> Are you saying it is possible to get events from the high-level >>> consumer >>>>>>> regarding various state machine changes? For instance, can we get a >>>>>>> notification when a rebalance starts and ends, when a partition is >>>>>>> assigned/unassigned, when an offset is committed on a partition, >> when >>> a >>>>>>> leader changes and so on? I call this OOB traffic, since they are >> not >>>>>>> the >>>>>>> core messages streaming, but side-band events, yet they are still >>>>>>> potentially useful to consumers. >>>>>>> >>>>>>> Thank you, >>>>>>> Robert >>>>>>> >>>>>>> >>>>>>> Robert Withers >>>>>>> Staff Analyst/Developer >>>>>>> o: (720) 514-8963 >>>>>>> c: (571) 262-1873 >>>>>>> >>>>>>> >>>>>>> >>>>>>> -----Original Message----- >>>>>>> From: Jun Rao [mailto:jun...@gmail.com] >>>>>>> Sent: Sunday, February 23, 2014 4:19 PM >>>>>>> To: users@kafka.apache.org<mailto:users@kafka.apache.org> >>>>>>> Subject: Re: New Consumer API discussion >>>>>>> >>>>>>> Robert, >>>>>>> >>>>>>> For the push orient api, you can potentially implement your own >>>>>>> MessageHandler with those methods. In the main loop of our new >>> consumer >>>>>>> api, you can just call those methods based on the events you get. >>>>>>> >>>>>>> Also, we already have an api to get the first and the last offset >> of a >>>>>>> partition (getOffsetBefore). >>>>>>> >>>>>>> Thanks, >>>>>>> >>>>>>> Jun >>>>>>> >>>>>>> >>>>>>> On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert >>>>>>> <robert.with...@dish.com<mailto:robert.with...@dish.com>>wrote: >>>>>>> >>>>>>> This is a good idea, too. I would modify it to include stream >>>>>>> marking, then you can have: >>>>>>> >>>>>>> long end = consumer.lastOffset(tp); >>>>>>> consumer.setMark(end); >>>>>>> while(consumer.beforeMark()) { >>>>>>> process(consumer.pollToMark()); >>>>>>> } >>>>>>> >>>>>>> or >>>>>>> >>>>>>> long end = consumer.lastOffset(tp); >>>>>>> consumer.setMark(end); >>>>>>> for(Object msg : consumer.iteratorToMark()) { >>>>>>> process(msg); >>>>>>> } >>>>>>> >>>>>>> I actually have 4 suggestions, then: >>>>>>> >>>>>>> * pull: stream marking >>>>>>> * pull: finite streams, bound by time range (up-to-now, yesterday) >>> or >>>>>>> offset >>>>>>> * pull: async api >>>>>>> * push: KafkaMessageSource, for a push model, with msg and OOB >>> events. >>>>>>> Build one in either individual or chunk mode and have a listener for >>>>>>> each msg or a listener for a chunk of msgs. Make it composable and >>>>>>> policy driven (chunked, range, commitOffsets policy, retry policy, >>>>>>> transactional) >>>>>>> >>>>>>> Thank you, >>>>>>> Robert >>>>>>> >>>>>>> On Feb 22, 2014, at 11:21 AM, Jay Kreps <jay.kr...@gmail.com >> <mailto: >>>>>>> jay.kr...@gmail.com><mailto: >>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>>> wrote: >>>>>>> >>>>>>> I think what Robert is saying is that we need to think through the >>>>>>> offset API to enable "batch processing" of topic data. Think of a >>>>>>> process that periodically kicks off to compute a data summary or do >> a >>>>>>> data load or something like that. I think what we need to support >> this >>>>>>> is an api to fetch the last offset from the server for a partition. >>>>>>> Something like >>>>>>> long lastOffset(TopicPartition tp) >>>>>>> and for symmetry >>>>>>> long firstOffset(TopicPartition tp) >>>>>>> >>>>>>> Likely this would have to be batched. Essentially we should add this >>>>>>> use case to our set of code examples to write and think through. >>>>>>> >>>>>>> The usage would be something like >>>>>>> >>>>>>> long end = consumer.lastOffset(tp); >>>>>>> while(consumer.position < end) >>>>>>> process(consumer.poll()); >>>>>>> >>>>>>> -Jay >>>>>>> >>>>>>> >>>>>>> On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert >>>>>>> <robert.with...@dish.com<mailto:robert.with...@dish.com> >>>>>>> <mailto:robert.with...@dish.com>>wrote: >>>>>>> >>>>>>> Jun, >>>>>>> >>>>>>> I was originally thinking a non-blocking read from a distributed >>>>>>> stream should distinguish between "no local messages, but a fetch is >>>>>>> occurring" >>>>>>> versus "you have drained the stream". The reason this may be >> valuable >>>>>>> to me is so I can write consumers that read all known traffic then >>>>>>> terminate. >>>>>>> You caused me to reconsider and I think I am conflating 2 things. >> One >>>>>>> is a sync/async api while the other is whether to have an infinite >> or >>>>>>> finite stream. Is it possible to build a finite KafkaStream on a >>>>>>> range of messages? >>>>>>> >>>>>>> Perhaps a Simple Consumer would do just fine and then I could start >>>>>>> off getting the writeOffset from zookeeper and tell it to read a >>>>>>> specified range per partition. I've done this and forked a simple >>>>>>> consumer runnable for each partition, for one of our analyzers. The >>>>>>> great thing about the high-level consumer is that rebalance, so I >> can >>>>>>> fork however many stream readers I want and you just figure it out >> for >>>>>>> me. In that way you offer us the control over the resource >>>>>>> consumption within a pull model. This is best to regulate message >>>>>>> pressure, they say. >>>>>>> >>>>>>> Combining that high-level rebalance ability with a ranged partition >>>>>>> drain could be really nice...build the stream with an ending >> position >>>>>>> and it is a finite stream, but retain the high-level rebalance. >> With >>>>>>> a finite stream, you would know the difference of the 2 async >>>>>>> scenarios: fetch-in-progress versus end-of-stream. With an infinite >>>>>>> stream, you never get end-of-stream. >>>>>>> >>>>>>> Aside from a high-level consumer over a finite range within each >>>>>>> partition, the other feature I can think of is more complicated. A >>>>>>> high-level consumer has state machine changes that the client cannot >>>>>>> access, to my knowledge. Our use of kafka has us invoke a message >>>>>>> handler with each message we consumer from the KafkaStream, so we >>>>>>> convert a pull-model to a push-model. Including the idea of >> receiving >>>>>>> notifications from state machine changes, what would be really nice >> is >>>>>>> to have a KafkaMessageSource, that is an eventful push model. If it >>>>>>> were thread-safe, then we could register listeners for various >> events: >>>>>>> >>>>>>> * opening-stream >>>>>>> * closing-stream >>>>>>> * message-arrived >>>>>>> * end-of-stream/no-more-messages-in-partition (for finite streams) >>>>>>> * rebalance started >>>>>>> * partition assigned >>>>>>> * partition unassigned >>>>>>> * rebalance finished >>>>>>> * partition-offset-committed >>>>>>> >>>>>>> Perhaps that is just our use, but instead of a pull-oriented >>>>>>> KafkaStream, is there any sense in your providing a push-oriented >>>>>>> KafkaMessageSource publishing OOB messages? >>>>>>> >>>>>>> thank you, >>>>>>> Robert >>>>>>> >>>>>>> On Feb 21, 2014, at 5:59 PM, Jun Rao <jun...@gmail.com<mailto: >>>>>>> jun...@gmail.com><mailto: >>>>>>> jun...@gmail.com<mailto:jun...@gmail.com>><mailto: >>>>>>> jun...@gmail.com<mailto:jun...@gmail.com><mailto:jun...@gmail.com >>>>> >>>>>>> wrote: >>>>>>> >>>>>>> Robert, >>>>>>> >>>>>>> Could you explain why you want to distinguish btw >>>>>>> FetchingInProgressException and NoMessagePendingException? The >>>>>>> nextMsgs() method that you want is exactly what poll() does. >>>>>>> >>>>>>> Thanks, >>>>>>> >>>>>>> Jun >>>>>>> >>>>>>> >>>>>>> On Wed, Feb 19, 2014 at 8:45 AM, Withers, Robert >>>>>>> <robert.with...@dish.com<mailto:robert.with...@dish.com> <mailto: >>>>>>> robert.with...@dish.com> >>>>>>> <mailto:robert.with...@dish.com>>wrote: >>>>>>> >>>>>>> I am not clear on why the consumer stream should be positionable, >>>>>>> especially if it is limited to the in-memory fetched messages. >> Could >>>>>>> someone explain to me, please? I really like the idea of committing >>>>>>> the offset specifically on those partitions with changed read >> offsets, >>>>>>> only. >>>>>>> >>>>>>> >>>>>>> >>>>>>> 2 items I would like to see added to the KafkaStream are: >>>>>>> >>>>>>> * a non-blocking next(), throws several exceptions >>>>>>> (FetchingInProgressException and a NoMessagePendingException or >>>>>>> something) to differentiate between fetching or no messages left. >>>>>>> >>>>>>> * A nextMsgs() method which returns all locally available >>>>>>> messages >>>>>>> and kicks off a fetch for the next chunk. >>>>>>> >>>>>>> >>>>>>> >>>>>>> If you are trying to add transactional features, then formally >> define >>>>>>> a DTP capability and pull in other server frameworks to share the >>>>>>> implementation. Should it be XA/Open? How about a new peer2peer >> DTP >>>>>>> protocol? >>>>>>> >>>>>>> >>>>>>> >>>>>>> Thank you, >>>>>>> >>>>>>> Robert >>>>>>> >>>>>>> >>>>>>> >>>>>>> Robert Withers >>>>>>> >>>>>>> Staff Analyst/Developer >>>>>>> >>>>>>> o: (720) 514-8963 >>>>>>> >>>>>>> c: (571) 262-1873 >>>>>>> >>>>>>> >>>>>>> >>>>>>> -----Original Message----- >>>>>>> From: Jay Kreps [mailto:jay.kr...@gmail.com] >>>>>>> Sent: Sunday, February 16, 2014 10:13 AM >>>>>>> To: users@kafka.apache.org<mailto:users@kafka.apache.org><mailto: >>>>>>> users@kafka.apache.org><mailto: >>>>>>> users@kafka.apache.org<mailto:users@kafka.apache.org>> >>>>>>> Subject: Re: New Consumer API discussion >>>>>>> >>>>>>> >>>>>>> >>>>>>> +1 I think those are good. It is a little weird that changing the >>>>>>> +fetch >>>>>>> >>>>>>> point is not batched but changing the commit point is, but I suppose >>>>>>> there is no helping that. >>>>>>> >>>>>>> >>>>>>> >>>>>>> -Jay >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Sat, Feb 15, 2014 at 7:52 AM, Neha Narkhede >>>>>>> <neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com> <mailto: >>>>>>> neha.narkh...@gmail.com> >>>>>>> <mailto:neha.narkh...@gmail.com> >>>>>>> <mailto:neha.narkh...@gmail.com>>wrote: >>>>>>> >>>>>>> >>>>>>> >>>>>>> Jay, >>>>>>> >>>>>>> >>>>>>> >>>>>>> That makes sense. position/seek deal with changing the consumers >>>>>>> >>>>>>> in-memory data, so there is no remote rpc there. For some reason, I >>>>>>> >>>>>>> got committed and seek mixed up in my head at that time :) >>>>>>> >>>>>>> >>>>>>> >>>>>>> So we still end up with >>>>>>> >>>>>>> >>>>>>> >>>>>>> long position(TopicPartition tp) >>>>>>> >>>>>>> void seek(TopicPartitionOffset p) >>>>>>> >>>>>>> Map<TopicPartition, Long> committed(TopicPartition tp); >>>>>>> >>>>>>> void commit(TopicPartitionOffset...); >>>>>>> >>>>>>> >>>>>>> >>>>>>> Thanks, >>>>>>> >>>>>>> Neha >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Friday, February 14, 2014, Jay Kreps <jay.kr...@gmail.com >> <mailto: >>>>>>> jay.kr...@gmail.com><mailto: >>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto: >>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto: >>>>>>> jay.kr...@gmail.com>><mailto: >>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto: >>>>>>> jay.kr...@gmail.com><mailto:jay.kreps@gmail >>>>>>> .com>>> >>>>>>> wrote: >>>>>>> >>>>>>> >>>>>>> >>>>>>> Oh, interesting. So I am assuming the following implementation: >>>>>>> >>>>>>> 1. We have an in-memory fetch position which controls the next fetch >>>>>>> >>>>>>> offset. >>>>>>> >>>>>>> 2. Changing this has no effect until you poll again at which point >>>>>>> >>>>>>> your fetch request will be from the newly specified offset 3. We >>>>>>> >>>>>>> then have an in-memory but also remotely stored committed offset. >>>>>>> >>>>>>> 4. Calling commit has the effect of saving the fetch position as >>>>>>> >>>>>>> both the in memory committed position and in the remote store 5. >>>>>>> >>>>>>> Auto-commit is the same as periodically calling commit on all >>>>>>> >>>>>>> positions. >>>>>>> >>>>>>> >>>>>>> >>>>>>> So batching on commit as well as getting the committed position >>>>>>> >>>>>>> makes sense, but batching the fetch position wouldn't, right? I >>>>>>> >>>>>>> think you are actually thinking of a different approach. >>>>>>> >>>>>>> >>>>>>> >>>>>>> -Jay >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Thu, Feb 13, 2014 at 10:40 PM, Neha Narkhede >>>>>>> >>>>>>> <neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto: >>>>>>> neha.narkh...@gmail.com><mailto: >>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>> >>>>>>> >>>>>>> <javascript:;> >>>>>>> >>>>>>> wrote: >>>>>>> >>>>>>> >>>>>>> >>>>>>> I think you are saying both, i.e. if you have committed on a >>>>>>> >>>>>>> partition it returns you that value but if you >>>>>>> >>>>>>> haven't >>>>>>> >>>>>>> it does a remote lookup? >>>>>>> >>>>>>> >>>>>>> >>>>>>> Correct. >>>>>>> >>>>>>> >>>>>>> >>>>>>> The other argument for making committed batched is that commit() >>>>>>> >>>>>>> is batched, so there is symmetry. >>>>>>> >>>>>>> >>>>>>> >>>>>>> position() and seek() are always in memory changes (I assume) so >>>>>>> >>>>>>> there >>>>>>> >>>>>>> is >>>>>>> >>>>>>> no need to batch them. >>>>>>> >>>>>>> >>>>>>> >>>>>>> I'm not as sure as you are about that assumption being true. >>>>>>> >>>>>>> Basically >>>>>>> >>>>>>> in >>>>>>> >>>>>>> my example above, the batching argument for committed() also >>>>>>> >>>>>>> applies to >>>>>>> >>>>>>> position() since one purpose of fetching a partition's offset is >>>>>>> >>>>>>> to use >>>>>>> >>>>>>> it >>>>>>> >>>>>>> to set the position of the consumer to that offset. Since that >>>>>>> >>>>>>> might >>>>>>> >>>>>>> lead >>>>>>> >>>>>>> to a remote OffsetRequest call, I think we probably would be >>>>>>> >>>>>>> better off batching it. >>>>>>> >>>>>>> >>>>>>> >>>>>>> Another option for naming would be position/reposition instead of >>>>>>> >>>>>>> position/seek. >>>>>>> >>>>>>> >>>>>>> >>>>>>> I think position/seek is better since it aligns with Java file APIs. >>>>>>> >>>>>>> >>>>>>> >>>>>>> I also think your suggestion about ConsumerPosition makes sense. >>>>>>> >>>>>>> >>>>>>> >>>>>>> Thanks, >>>>>>> >>>>>>> Neha >>>>>>> >>>>>>> On Feb 13, 2014 9:22 PM, "Jay Kreps" <jay.kr...@gmail.com<mailto: >>>>>>> jay.kr...@gmail.com><mailto: >>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto: >>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto: >>>>>>> jay.kr...@gmail.com>><mailto: >>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto: >>>>>>> jay.kr...@gmail.com><mailto:jay.kreps@gmail >>>>>>> .com>>> >>>>>>> wrote: >>>>>>> >>>>>>> >>>>>>> >>>>>>> Hey Neha, >>>>>>> >>>>>>> >>>>>>> >>>>>>> I actually wasn't proposing the name TopicOffsetPosition, that >>>>>>> >>>>>>> was >>>>>>> >>>>>>> just a >>>>>>> >>>>>>> typo. I meant TopicPartitionOffset, and I was just referencing >>>>>>> >>>>>>> what >>>>>>> >>>>>>> was >>>>>>> >>>>>>> in >>>>>>> >>>>>>> the javadoc. So to restate my proposal without the typo, using >>>>>>> >>>>>>> just >>>>>>> >>>>>>> the >>>>>>> >>>>>>> existing classes (that naming is a separate question): >>>>>>> >>>>>>> long position(TopicPartition tp) >>>>>>> >>>>>>> void seek(TopicPartitionOffset p) >>>>>>> >>>>>>> long committed(TopicPartition tp) >>>>>>> >>>>>>> void commit(TopicPartitionOffset...); >>>>>>> >>>>>>> >>>>>>> >>>>>>> So I may be unclear on committed() (AKA lastCommittedOffset). Is >>>>>>> >>>>>>> it returning the in-memory value from the last commit by this >>>>>>> >>>>>>> consumer, >>>>>>> >>>>>>> or >>>>>>> >>>>>>> is >>>>>>> >>>>>>> it doing a remote fetch, or both? I think you are saying both, i.e. >>>>>>> >>>>>>> if >>>>>>> >>>>>>> you >>>>>>> >>>>>>> have committed on a partition it returns you that value but if >>>>>>> >>>>>>> you >>>>>>> >>>>>>> haven't >>>>>>> >>>>>>> it does a remote lookup? >>>>>>> >>>>>>> >>>>>>> >>>>>>> The other argument for making committed batched is that commit() >>>>>>> >>>>>>> is batched, so there is symmetry. >>>>>>> >>>>>>> >>>>>>> >>>>>>> position() and seek() are always in memory changes (I assume) so >>>>>>> >>>>>>> there >>>>>>> >>>>>>> is >>>>>>> >>>>>>> no need to batch them. >>>>>>> >>>>>>> >>>>>>> >>>>>>> So taking all that into account what if we revise it to >>>>>>> >>>>>>> long position(TopicPartition tp) >>>>>>> >>>>>>> void seek(TopicPartitionOffset p) >>>>>>> >>>>>>> Map<TopicPartition, Long> committed(TopicPartition tp); >>>>>>> >>>>>>> void commit(TopicPartitionOffset...); >>>>>>> >>>>>>> >>>>>>> >>>>>>> This is not symmetric between position/seek and commit/committed >>>>>>> >>>>>>> but >>>>>>> >>>>>>> it >>>>>>> >>>>>>> is >>>>>>> >>>>>>> convenient. Another option for naming would be >>>>>>> >>>>>>> position/reposition >>>>>>> >>>>>>> instead >>>>>>> >>>>>>> of position/seek. >>>>>>> >>>>>>> >>>>>>> >>>>>>> With respect to the name TopicPartitionOffset, what I was trying >>>>>>> >>>>>>> to >>>>>>> >>>>>>> say >>>>>>> >>>>>>> is >>>>>>> >>>>>>> that I recommend we change that to something shorter. I think >>>>>>> >>>>>>> TopicPosition >>>>>>> >>>>>>> or ConsumerPosition might be better. Position does not refer to >>>>>>> >>>>>>> the variables in the object, it refers to the meaning of the >>>>>>> >>>>>>> object--it represents a position within a topic. The offset >>>>>>> >>>>>>> field in that object >>>>>>> >>>>>>> is >>>>>>> >>>>>>> still called the offset. TopicOffset, PartitionOffset, or >>>>>>> >>>>>>> ConsumerOffset >>>>>>> >>>>>>> would all be workable too. Basically I am just objecting to >>>>>>> >>>>>>> concatenating >>>>>>> >>>>>>> three nouns together. :-) >>>>>>> >>>>>>> >>>>>>> >>>>>>> -Jay >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Thu, Feb 13, 2014 at 1:54 PM, Neha Narkhede < >>>>>>> >>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto: >>>>>>> neha.narkh...@gmail.com><mailto: >>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>><mailto: >>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto: >>>>>>> neha.narkh...@gmail.com>> >>>>>>> >>>>>>> wrote: >>>>>>> >>>>>>> >>>>>>> >>>>>>> 2. It returns a list of results. But how can you use the list? >>>>>>> >>>>>>> The >>>>>>> >>>>>>> only >>>>>>> >>>>>>> way >>>>>>> >>>>>>> to use the list is to make a map of tp=>offset and then look >>>>>>> >>>>>>> up >>>>>>> >>>>>>> results >>>>>>> >>>>>>> in >>>>>>> >>>>>>> this map (or do a for loop over the list for the partition you >>>>>>> >>>>>>> want). I >>>>>>> >>>>>>> recommend that if this is an in-memory check we just do one at >>>>>>> >>>>>>> a >>>>>>> >>>>>>> time. >>>>>>> >>>>>>> E.g. >>>>>>> >>>>>>> long committedPosition( >>>>>>> >>>>>>> TopicPosition). >>>>>>> >>>>>>> >>>>>>> >>>>>>> This was discussed in the previous emails. There is a choic >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Robert Withers >>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com><mailto: >>>>>>> robert.with...@dish.com><mailto: >>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com>> >>>>>>> c: 303.919.5856 >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Robert Withers >>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com><mailto: >>>>>>> robert.with...@dish.com> >>>>>>> c: 303.919.5856 >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> -- Guozhang >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Robert Withers >>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com> >>>>>>> c: 303.919.5856 >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >>
smime.p7s
Description: S/MIME cryptographic signature