I am not clear on why the consumer stream should be positionable, especially if 
it is limited to the in-memory fetched messages.  Could someone explain to me, 
please?  I really like the idea of committing the offset specifically on those 
partitions with changed read offsets, only.



2 items I would like to see added to the KafkaStream are:

*         a non-blocking next(), throws several exceptions 
(FetchingInProgressException and a NoMessagePendingException or something) to 
differentiate between fetching or no messages left.

*         A nextMsgs() method which returns all locally available messages and 
kicks off a fetch for the next chunk.



If you are trying to add transactional features, then formally define a DTP 
capability and pull in other server frameworks to share the implementation.  
Should it be XA/Open?  How about a new peer2peer DTP protocol?



Thank you,

Robert



Robert Withers

Staff Analyst/Developer

o: (720) 514-8963

c:  (571) 262-1873



-----Original Message-----
From: Jay Kreps [mailto:jay.kr...@gmail.com]
Sent: Sunday, February 16, 2014 10:13 AM
To: users@kafka.apache.org
Subject: Re: New Consumer API discussion



+1 I think those are good. It is a little weird that changing the fetch

point is not batched but changing the commit point is, but I suppose there is 
no helping that.



-Jay





On Sat, Feb 15, 2014 at 7:52 AM, Neha Narkhede 
<neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>>wrote:



> Jay,

>

> That makes sense. position/seek deal with changing the consumers

> in-memory data, so there is no remote rpc there. For some reason, I

> got committed and seek mixed up in my head at that time :)

>

> So we still end up with

>

>    long position(TopicPartition tp)

>    void seek(TopicPartitionOffset p)

>    Map<TopicPartition, Long> committed(TopicPartition tp);

>    void commit(TopicPartitionOffset...);

>

> Thanks,

> Neha

>

> On Friday, February 14, 2014, Jay Kreps 
> <jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>> wrote:

>

> > Oh, interesting. So I am assuming the following implementation:

> > 1. We have an in-memory fetch position which controls the next fetch

> > offset.

> > 2. Changing this has no effect until you poll again at which point

> > your fetch request will be from the newly specified offset 3. We

> > then have an in-memory but also remotely stored committed offset.

> > 4. Calling commit has the effect of saving the fetch position as

> > both the in memory committed position and in the remote store 5.

> > Auto-commit is the same as periodically calling commit on all

> positions.

> >

> > So batching on commit as well as getting the committed position

> > makes sense, but batching the fetch position wouldn't, right? I

> > think you are actually thinking of a different approach.

> >

> > -Jay

> >

> >

> > On Thu, Feb 13, 2014 at 10:40 PM, Neha Narkhede

> > <neha.narkh...@gmail.com

> <javascript:;>

> > >wrote:

> >

> > > I think you are saying both, i.e. if you have committed on a

> > > partition it returns you that value but if you

> > haven't

> > > it does a remote lookup?

> > >

> > > Correct.

> > >

> > > The other argument for making committed batched is that commit()

> > > is batched, so there is symmetry.

> > >

> > > position() and seek() are always in memory changes (I assume) so

> > > there

> is

> > > no need to batch them.

> > >

> > > I'm not as sure as you are about that assumption being true.

> > > Basically

> in

> > > my example above, the batching argument for committed() also

> > > applies to

> > > position() since one purpose of fetching a partition's offset is

> > > to use

> > it

> > > to set the position of the consumer to that offset. Since that

> > > might

> lead

> > > to a remote OffsetRequest call, I think we probably would be

> > > better off batching it.

> > >

> > > Another option for naming would be position/reposition instead of

> > > position/seek.

> > >

> > > I think position/seek is better since it aligns with Java file APIs.

> > >

> > > I also think your suggestion about ConsumerPosition makes sense.

> > >

> > > Thanks,

> > > Neha

> > > On Feb 13, 2014 9:22 PM, "Jay Kreps" 
> > > <jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>> wrote:

> > >

> > > > Hey Neha,

> > > >

> > > > I actually wasn't proposing the name TopicOffsetPosition, that

> > > > was

> > just a

> > > > typo. I meant TopicPartitionOffset, and I was just referencing

> > > > what

> was

> > > in

> > > > the javadoc. So to restate my proposal without the typo, using

> > > > just

> the

> > > > existing classes (that naming is a separate question):

> > > >    long position(TopicPartition tp)

> > > >    void seek(TopicPartitionOffset p)

> > > >    long committed(TopicPartition tp)

> > > >    void commit(TopicPartitionOffset...);

> > > >

> > > > So I may be unclear on committed() (AKA lastCommittedOffset). Is

> > > > it returning the in-memory value from the last commit by this

> > > > consumer,

> or

> > > is

> > > > it doing a remote fetch, or both? I think you are saying both, i.e.

> if

> > > you

> > > > have committed on a partition it returns you that value but if

> > > > you

> > > haven't

> > > > it does a remote lookup?

> > > >

> > > > The other argument for making committed batched is that commit()

> > > > is batched, so there is symmetry.

> > > >

> > > > position() and seek() are always in memory changes (I assume) so

> there

> > is

> > > > no need to batch them.

> > > >

> > > > So taking all that into account what if we revise it to

> > > >    long position(TopicPartition tp)

> > > >    void seek(TopicPartitionOffset p)

> > > >    Map<TopicPartition, Long> committed(TopicPartition tp);

> > > >    void commit(TopicPartitionOffset...);

> > > >

> > > > This is not symmetric between position/seek and commit/committed

> > > > but

> it

> > > is

> > > > convenient. Another option for naming would be

> > > > position/reposition

> > > instead

> > > > of position/seek.

> > > >

> > > > With respect to the name TopicPartitionOffset, what I was trying

> > > > to

> say

> > > is

> > > > that I recommend we change that to something shorter. I think

> > > TopicPosition

> > > > or ConsumerPosition might be better. Position does not refer to

> > > > the variables in the object, it refers to the meaning of the

> > > > object--it represents a position within a topic. The offset

> > > > field in that object

> > is

> > > > still called the offset. TopicOffset, PartitionOffset, or

> > ConsumerOffset

> > > > would all be workable too. Basically I am just objecting to

> > concatenating

> > > > three nouns together. :-)

> > > >

> > > > -Jay

> > > >

> > > >

> > > >

> > > >

> > > >

> > > > On Thu, Feb 13, 2014 at 1:54 PM, Neha Narkhede <

> > neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>

> > > > >wrote:

> > > >

> > > > > 2. It returns a list of results. But how can you use the list?

> > > > > The

> > only

> > > > way

> > > > > to use the list is to make a map of tp=>offset and then look

> > > > > up

> > results

> > > > in

> > > > > this map (or do a for loop over the list for the partition you

> > want). I

> > > > > recommend that if this is an in-memory check we just do one at

> > > > > a

> > time.

> > > > E.g.

> > > > > long committedPosition(

> > > > > TopicPosition).

> > > > >

> > > > > This was discussed in the previous emails. There is a choic

>

Reply via email to