I am not clear on why the consumer stream should be positionable, especially if it is limited to the in-memory fetched messages. Could someone explain to me, please? I really like the idea of committing the offset specifically on those partitions with changed read offsets, only.
2 items I would like to see added to the KafkaStream are: * a non-blocking next(), throws several exceptions (FetchingInProgressException and a NoMessagePendingException or something) to differentiate between fetching or no messages left. * A nextMsgs() method which returns all locally available messages and kicks off a fetch for the next chunk. If you are trying to add transactional features, then formally define a DTP capability and pull in other server frameworks to share the implementation. Should it be XA/Open? How about a new peer2peer DTP protocol? Thank you, Robert Robert Withers Staff Analyst/Developer o: (720) 514-8963 c: (571) 262-1873 -----Original Message----- From: Jay Kreps [mailto:jay.kr...@gmail.com] Sent: Sunday, February 16, 2014 10:13 AM To: users@kafka.apache.org Subject: Re: New Consumer API discussion +1 I think those are good. It is a little weird that changing the fetch point is not batched but changing the commit point is, but I suppose there is no helping that. -Jay On Sat, Feb 15, 2014 at 7:52 AM, Neha Narkhede <neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>>wrote: > Jay, > > That makes sense. position/seek deal with changing the consumers > in-memory data, so there is no remote rpc there. For some reason, I > got committed and seek mixed up in my head at that time :) > > So we still end up with > > long position(TopicPartition tp) > void seek(TopicPartitionOffset p) > Map<TopicPartition, Long> committed(TopicPartition tp); > void commit(TopicPartitionOffset...); > > Thanks, > Neha > > On Friday, February 14, 2014, Jay Kreps > <jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>> wrote: > > > Oh, interesting. So I am assuming the following implementation: > > 1. We have an in-memory fetch position which controls the next fetch > > offset. > > 2. Changing this has no effect until you poll again at which point > > your fetch request will be from the newly specified offset 3. We > > then have an in-memory but also remotely stored committed offset. > > 4. Calling commit has the effect of saving the fetch position as > > both the in memory committed position and in the remote store 5. > > Auto-commit is the same as periodically calling commit on all > positions. > > > > So batching on commit as well as getting the committed position > > makes sense, but batching the fetch position wouldn't, right? I > > think you are actually thinking of a different approach. > > > > -Jay > > > > > > On Thu, Feb 13, 2014 at 10:40 PM, Neha Narkhede > > <neha.narkh...@gmail.com > <javascript:;> > > >wrote: > > > > > I think you are saying both, i.e. if you have committed on a > > > partition it returns you that value but if you > > haven't > > > it does a remote lookup? > > > > > > Correct. > > > > > > The other argument for making committed batched is that commit() > > > is batched, so there is symmetry. > > > > > > position() and seek() are always in memory changes (I assume) so > > > there > is > > > no need to batch them. > > > > > > I'm not as sure as you are about that assumption being true. > > > Basically > in > > > my example above, the batching argument for committed() also > > > applies to > > > position() since one purpose of fetching a partition's offset is > > > to use > > it > > > to set the position of the consumer to that offset. Since that > > > might > lead > > > to a remote OffsetRequest call, I think we probably would be > > > better off batching it. > > > > > > Another option for naming would be position/reposition instead of > > > position/seek. > > > > > > I think position/seek is better since it aligns with Java file APIs. > > > > > > I also think your suggestion about ConsumerPosition makes sense. > > > > > > Thanks, > > > Neha > > > On Feb 13, 2014 9:22 PM, "Jay Kreps" > > > <jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>> wrote: > > > > > > > Hey Neha, > > > > > > > > I actually wasn't proposing the name TopicOffsetPosition, that > > > > was > > just a > > > > typo. I meant TopicPartitionOffset, and I was just referencing > > > > what > was > > > in > > > > the javadoc. So to restate my proposal without the typo, using > > > > just > the > > > > existing classes (that naming is a separate question): > > > > long position(TopicPartition tp) > > > > void seek(TopicPartitionOffset p) > > > > long committed(TopicPartition tp) > > > > void commit(TopicPartitionOffset...); > > > > > > > > So I may be unclear on committed() (AKA lastCommittedOffset). Is > > > > it returning the in-memory value from the last commit by this > > > > consumer, > or > > > is > > > > it doing a remote fetch, or both? I think you are saying both, i.e. > if > > > you > > > > have committed on a partition it returns you that value but if > > > > you > > > haven't > > > > it does a remote lookup? > > > > > > > > The other argument for making committed batched is that commit() > > > > is batched, so there is symmetry. > > > > > > > > position() and seek() are always in memory changes (I assume) so > there > > is > > > > no need to batch them. > > > > > > > > So taking all that into account what if we revise it to > > > > long position(TopicPartition tp) > > > > void seek(TopicPartitionOffset p) > > > > Map<TopicPartition, Long> committed(TopicPartition tp); > > > > void commit(TopicPartitionOffset...); > > > > > > > > This is not symmetric between position/seek and commit/committed > > > > but > it > > > is > > > > convenient. Another option for naming would be > > > > position/reposition > > > instead > > > > of position/seek. > > > > > > > > With respect to the name TopicPartitionOffset, what I was trying > > > > to > say > > > is > > > > that I recommend we change that to something shorter. I think > > > TopicPosition > > > > or ConsumerPosition might be better. Position does not refer to > > > > the variables in the object, it refers to the meaning of the > > > > object--it represents a position within a topic. The offset > > > > field in that object > > is > > > > still called the offset. TopicOffset, PartitionOffset, or > > ConsumerOffset > > > > would all be workable too. Basically I am just objecting to > > concatenating > > > > three nouns together. :-) > > > > > > > > -Jay > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Feb 13, 2014 at 1:54 PM, Neha Narkhede < > > neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com> > > > > >wrote: > > > > > > > > > 2. It returns a list of results. But how can you use the list? > > > > > The > > only > > > > way > > > > > to use the list is to make a map of tp=>offset and then look > > > > > up > > results > > > > in > > > > > this map (or do a for loop over the list for the partition you > > want). I > > > > > recommend that if this is an in-memory check we just do one at > > > > > a > > time. > > > > E.g. > > > > > long committedPosition( > > > > > TopicPosition). > > > > > > > > > > This was discussed in the previous emails. There is a choic >