Hi Robert, Yes, you can check out the callback functions in the new API
onPartitionDesigned onPartitionAssigned and see if they meet your needs. Guozhang On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert <robert.with...@dish.com>wrote: > Jun, > > Are you saying it is possible to get events from the high-level consumer > regarding various state machine changes? For instance, can we get a > notification when a rebalance starts and ends, when a partition is > assigned/unassigned, when an offset is committed on a partition, when a > leader changes and so on? I call this OOB traffic, since they are not the > core messages streaming, but side-band events, yet they are still > potentially useful to consumers. > > Thank you, > Robert > > > Robert Withers > Staff Analyst/Developer > o: (720) 514-8963 > c: (571) 262-1873 > > > > -----Original Message----- > From: Jun Rao [mailto:jun...@gmail.com] > Sent: Sunday, February 23, 2014 4:19 PM > To: users@kafka.apache.org > Subject: Re: New Consumer API discussion > > Robert, > > For the push orient api, you can potentially implement your own > MessageHandler with those methods. In the main loop of our new consumer > api, you can just call those methods based on the events you get. > > Also, we already have an api to get the first and the last offset of a > partition (getOffsetBefore). > > Thanks, > > Jun > > > On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert > <robert.with...@dish.com>wrote: > > > This is a good idea, too. I would modify it to include stream > > marking, then you can have: > > > > long end = consumer.lastOffset(tp); > > consumer.setMark(end); > > while(consumer.beforeMark()) { > > process(consumer.pollToMark()); > > } > > > > or > > > > long end = consumer.lastOffset(tp); > > consumer.setMark(end); > > for(Object msg : consumer.iteratorToMark()) { > > process(msg); > > } > > > > I actually have 4 suggestions, then: > > > > * pull: stream marking > > * pull: finite streams, bound by time range (up-to-now, yesterday) or > > offset > > * pull: async api > > * push: KafkaMessageSource, for a push model, with msg and OOB events. > > Build one in either individual or chunk mode and have a listener for > > each msg or a listener for a chunk of msgs. Make it composable and > > policy driven (chunked, range, commitOffsets policy, retry policy, > > transactional) > > > > Thank you, > > Robert > > > > On Feb 22, 2014, at 11:21 AM, Jay Kreps <jay.kr...@gmail.com<mailto: > > jay.kr...@gmail.com>> wrote: > > > > I think what Robert is saying is that we need to think through the > > offset API to enable "batch processing" of topic data. Think of a > > process that periodically kicks off to compute a data summary or do a > > data load or something like that. I think what we need to support this > > is an api to fetch the last offset from the server for a partition. > Something like > > long lastOffset(TopicPartition tp) > > and for symmetry > > long firstOffset(TopicPartition tp) > > > > Likely this would have to be batched. Essentially we should add this > > use case to our set of code examples to write and think through. > > > > The usage would be something like > > > > long end = consumer.lastOffset(tp); > > while(consumer.position < end) > > process(consumer.poll()); > > > > -Jay > > > > > > On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert > > <robert.with...@dish.com > > <mailto:robert.with...@dish.com>>wrote: > > > > Jun, > > > > I was originally thinking a non-blocking read from a distributed > > stream should distinguish between "no local messages, but a fetch is > occurring" > > versus "you have drained the stream". The reason this may be valuable > > to me is so I can write consumers that read all known traffic then > terminate. > > You caused me to reconsider and I think I am conflating 2 things. One > > is a sync/async api while the other is whether to have an infinite or > > finite stream. Is it possible to build a finite KafkaStream on a > > range of messages? > > > > Perhaps a Simple Consumer would do just fine and then I could start > > off getting the writeOffset from zookeeper and tell it to read a > > specified range per partition. I've done this and forked a simple > > consumer runnable for each partition, for one of our analyzers. The > > great thing about the high-level consumer is that rebalance, so I can > > fork however many stream readers I want and you just figure it out for > > me. In that way you offer us the control over the resource > > consumption within a pull model. This is best to regulate message > pressure, they say. > > > > Combining that high-level rebalance ability with a ranged partition > > drain could be really nice...build the stream with an ending position > > and it is a finite stream, but retain the high-level rebalance. With > > a finite stream, you would know the difference of the 2 async > > scenarios: fetch-in-progress versus end-of-stream. With an infinite > > stream, you never get end-of-stream. > > > > Aside from a high-level consumer over a finite range within each > > partition, the other feature I can think of is more complicated. A > > high-level consumer has state machine changes that the client cannot > > access, to my knowledge. Our use of kafka has us invoke a message > > handler with each message we consumer from the KafkaStream, so we > > convert a pull-model to a push-model. Including the idea of receiving > > notifications from state machine changes, what would be really nice is > > to have a KafkaMessageSource, that is an eventful push model. If it > > were thread-safe, then we could register listeners for various events: > > > > * opening-stream > > * closing-stream > > * message-arrived > > * end-of-stream/no-more-messages-in-partition (for finite streams) > > * rebalance started > > * partition assigned > > * partition unassigned > > * rebalance finished > > * partition-offset-committed > > > > Perhaps that is just our use, but instead of a pull-oriented > > KafkaStream, is there any sense in your providing a push-oriented > > KafkaMessageSource publishing OOB messages? > > > > thank you, > > Robert > > > > On Feb 21, 2014, at 5:59 PM, Jun Rao <jun...@gmail.com<mailto: > > jun...@gmail.com><mailto: > > jun...@gmail.com<mailto:jun...@gmail.com>>> wrote: > > > > Robert, > > > > Could you explain why you want to distinguish btw > > FetchingInProgressException and NoMessagePendingException? The > > nextMsgs() method that you want is exactly what poll() does. > > > > Thanks, > > > > Jun > > > > > > On Wed, Feb 19, 2014 at 8:45 AM, Withers, Robert > > <robert.with...@dish.com <mailto:robert.with...@dish.com> > > <mailto:robert.with...@dish.com>>wrote: > > > > I am not clear on why the consumer stream should be positionable, > > especially if it is limited to the in-memory fetched messages. Could > > someone explain to me, please? I really like the idea of committing > > the offset specifically on those partitions with changed read offsets, > only. > > > > > > > > 2 items I would like to see added to the KafkaStream are: > > > > * a non-blocking next(), throws several exceptions > > (FetchingInProgressException and a NoMessagePendingException or > > something) to differentiate between fetching or no messages left. > > > > * A nextMsgs() method which returns all locally available > messages > > and kicks off a fetch for the next chunk. > > > > > > > > If you are trying to add transactional features, then formally define > > a DTP capability and pull in other server frameworks to share the > > implementation. Should it be XA/Open? How about a new peer2peer DTP > > protocol? > > > > > > > > Thank you, > > > > Robert > > > > > > > > Robert Withers > > > > Staff Analyst/Developer > > > > o: (720) 514-8963 > > > > c: (571) 262-1873 > > > > > > > > -----Original Message----- > > From: Jay Kreps [mailto:jay.kr...@gmail.com] > > Sent: Sunday, February 16, 2014 10:13 AM > > To: users@kafka.apache.org<mailto:users@kafka.apache.org><mailto: > > users@kafka.apache.org> > > Subject: Re: New Consumer API discussion > > > > > > > > +1 I think those are good. It is a little weird that changing the > > +fetch > > > > point is not batched but changing the commit point is, but I suppose > > there is no helping that. > > > > > > > > -Jay > > > > > > > > > > > > On Sat, Feb 15, 2014 at 7:52 AM, Neha Narkhede > > <neha.narkh...@gmail.com <mailto:neha.narkh...@gmail.com> > > <mailto:neha.narkh...@gmail.com> > > <mailto:neha.narkh...@gmail.com>>wrote: > > > > > > > > Jay, > > > > > > > > That makes sense. position/seek deal with changing the consumers > > > > in-memory data, so there is no remote rpc there. For some reason, I > > > > got committed and seek mixed up in my head at that time :) > > > > > > > > So we still end up with > > > > > > > > long position(TopicPartition tp) > > > > void seek(TopicPartitionOffset p) > > > > Map<TopicPartition, Long> committed(TopicPartition tp); > > > > void commit(TopicPartitionOffset...); > > > > > > > > Thanks, > > > > Neha > > > > > > > > On Friday, February 14, 2014, Jay Kreps <jay.kr...@gmail.com<mailto: > > jay.kr...@gmail.com><mailto: > > jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto: > > jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:jay.kreps@gmail > > .com>>> > > wrote: > > > > > > > > Oh, interesting. So I am assuming the following implementation: > > > > 1. We have an in-memory fetch position which controls the next fetch > > > > offset. > > > > 2. Changing this has no effect until you poll again at which point > > > > your fetch request will be from the newly specified offset 3. We > > > > then have an in-memory but also remotely stored committed offset. > > > > 4. Calling commit has the effect of saving the fetch position as > > > > both the in memory committed position and in the remote store 5. > > > > Auto-commit is the same as periodically calling commit on all > > > > positions. > > > > > > > > So batching on commit as well as getting the committed position > > > > makes sense, but batching the fetch position wouldn't, right? I > > > > think you are actually thinking of a different approach. > > > > > > > > -Jay > > > > > > > > > > > > On Thu, Feb 13, 2014 at 10:40 PM, Neha Narkhede > > > > <neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto: > > neha.narkh...@gmail.com> > > > > <javascript:;> > > > > wrote: > > > > > > > > I think you are saying both, i.e. if you have committed on a > > > > partition it returns you that value but if you > > > > haven't > > > > it does a remote lookup? > > > > > > > > Correct. > > > > > > > > The other argument for making committed batched is that commit() > > > > is batched, so there is symmetry. > > > > > > > > position() and seek() are always in memory changes (I assume) so > > > > there > > > > is > > > > no need to batch them. > > > > > > > > I'm not as sure as you are about that assumption being true. > > > > Basically > > > > in > > > > my example above, the batching argument for committed() also > > > > applies to > > > > position() since one purpose of fetching a partition's offset is > > > > to use > > > > it > > > > to set the position of the consumer to that offset. Since that > > > > might > > > > lead > > > > to a remote OffsetRequest call, I think we probably would be > > > > better off batching it. > > > > > > > > Another option for naming would be position/reposition instead of > > > > position/seek. > > > > > > > > I think position/seek is better since it aligns with Java file APIs. > > > > > > > > I also think your suggestion about ConsumerPosition makes sense. > > > > > > > > Thanks, > > > > Neha > > > > On Feb 13, 2014 9:22 PM, "Jay Kreps" <jay.kr...@gmail.com<mailto: > > jay.kr...@gmail.com><mailto: > > jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto: > > jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:jay.kreps@gmail > > .com>>> > > wrote: > > > > > > > > Hey Neha, > > > > > > > > I actually wasn't proposing the name TopicOffsetPosition, that > > > > was > > > > just a > > > > typo. I meant TopicPartitionOffset, and I was just referencing > > > > what > > > > was > > > > in > > > > the javadoc. So to restate my proposal without the typo, using > > > > just > > > > the > > > > existing classes (that naming is a separate question): > > > > long position(TopicPartition tp) > > > > void seek(TopicPartitionOffset p) > > > > long committed(TopicPartition tp) > > > > void commit(TopicPartitionOffset...); > > > > > > > > So I may be unclear on committed() (AKA lastCommittedOffset). Is > > > > it returning the in-memory value from the last commit by this > > > > consumer, > > > > or > > > > is > > > > it doing a remote fetch, or both? I think you are saying both, i.e. > > > > if > > > > you > > > > have committed on a partition it returns you that value but if > > > > you > > > > haven't > > > > it does a remote lookup? > > > > > > > > The other argument for making committed batched is that commit() > > > > is batched, so there is symmetry. > > > > > > > > position() and seek() are always in memory changes (I assume) so > > > > there > > > > is > > > > no need to batch them. > > > > > > > > So taking all that into account what if we revise it to > > > > long position(TopicPartition tp) > > > > void seek(TopicPartitionOffset p) > > > > Map<TopicPartition, Long> committed(TopicPartition tp); > > > > void commit(TopicPartitionOffset...); > > > > > > > > This is not symmetric between position/seek and commit/committed > > > > but > > > > it > > > > is > > > > convenient. Another option for naming would be > > > > position/reposition > > > > instead > > > > of position/seek. > > > > > > > > With respect to the name TopicPartitionOffset, what I was trying > > > > to > > > > say > > > > is > > > > that I recommend we change that to something shorter. I think > > > > TopicPosition > > > > or ConsumerPosition might be better. Position does not refer to > > > > the variables in the object, it refers to the meaning of the > > > > object--it represents a position within a topic. The offset > > > > field in that object > > > > is > > > > still called the offset. TopicOffset, PartitionOffset, or > > > > ConsumerOffset > > > > would all be workable too. Basically I am just objecting to > > > > concatenating > > > > three nouns together. :-) > > > > > > > > -Jay > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Feb 13, 2014 at 1:54 PM, Neha Narkhede < > > > > neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto: > > neha.narkh...@gmail.com><mailto: > > neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>> > > > > wrote: > > > > > > > > 2. It returns a list of results. But how can you use the list? > > > > The > > > > only > > > > way > > > > to use the list is to make a map of tp=>offset and then look > > > > up > > > > results > > > > in > > > > this map (or do a for loop over the list for the partition you > > > > want). I > > > > recommend that if this is an in-memory check we just do one at > > > > a > > > > time. > > > > E.g. > > > > long committedPosition( > > > > TopicPosition). > > > > > > > > This was discussed in the previous emails. There is a choic > > > > > > > > > > -- > > Robert Withers > > robert.with...@dish.com<mailto:robert.with...@dish.com><mailto: > > robert.with...@dish.com> > > c: 303.919.5856 > > > > > > > > -- > > Robert Withers > > robert.with...@dish.com<mailto:robert.with...@dish.com> > > c: 303.919.5856 > > > > > -- -- Guozhang