That’s wonderful. Thanks for kafka. Rob
On Feb 24, 2014, at 9:58 AM, Guozhang Wang <wangg...@gmail.com<mailto:wangg...@gmail.com>> wrote: Hi Robert, Yes, you can check out the callback functions in the new API onPartitionDesigned onPartitionAssigned and see if they meet your needs. Guozhang On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert <robert.with...@dish.com<mailto:robert.with...@dish.com>>wrote: Jun, Are you saying it is possible to get events from the high-level consumer regarding various state machine changes? For instance, can we get a notification when a rebalance starts and ends, when a partition is assigned/unassigned, when an offset is committed on a partition, when a leader changes and so on? I call this OOB traffic, since they are not the core messages streaming, but side-band events, yet they are still potentially useful to consumers. Thank you, Robert Robert Withers Staff Analyst/Developer o: (720) 514-8963 c: (571) 262-1873 -----Original Message----- From: Jun Rao [mailto:jun...@gmail.com] Sent: Sunday, February 23, 2014 4:19 PM To: users@kafka.apache.org<mailto:users@kafka.apache.org> Subject: Re: New Consumer API discussion Robert, For the push orient api, you can potentially implement your own MessageHandler with those methods. In the main loop of our new consumer api, you can just call those methods based on the events you get. Also, we already have an api to get the first and the last offset of a partition (getOffsetBefore). Thanks, Jun On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert <robert.with...@dish.com<mailto:robert.with...@dish.com>>wrote: This is a good idea, too. I would modify it to include stream marking, then you can have: long end = consumer.lastOffset(tp); consumer.setMark(end); while(consumer.beforeMark()) { process(consumer.pollToMark()); } or long end = consumer.lastOffset(tp); consumer.setMark(end); for(Object msg : consumer.iteratorToMark()) { process(msg); } I actually have 4 suggestions, then: * pull: stream marking * pull: finite streams, bound by time range (up-to-now, yesterday) or offset * pull: async api * push: KafkaMessageSource, for a push model, with msg and OOB events. Build one in either individual or chunk mode and have a listener for each msg or a listener for a chunk of msgs. Make it composable and policy driven (chunked, range, commitOffsets policy, retry policy, transactional) Thank you, Robert On Feb 22, 2014, at 11:21 AM, Jay Kreps <jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto: jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>>> wrote: I think what Robert is saying is that we need to think through the offset API to enable "batch processing" of topic data. Think of a process that periodically kicks off to compute a data summary or do a data load or something like that. I think what we need to support this is an api to fetch the last offset from the server for a partition. Something like long lastOffset(TopicPartition tp) and for symmetry long firstOffset(TopicPartition tp) Likely this would have to be batched. Essentially we should add this use case to our set of code examples to write and think through. The usage would be something like long end = consumer.lastOffset(tp); while(consumer.position < end) process(consumer.poll()); -Jay On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert <robert.with...@dish.com<mailto:robert.with...@dish.com> <mailto:robert.with...@dish.com>>wrote: Jun, I was originally thinking a non-blocking read from a distributed stream should distinguish between "no local messages, but a fetch is occurring" versus "you have drained the stream". The reason this may be valuable to me is so I can write consumers that read all known traffic then terminate. You caused me to reconsider and I think I am conflating 2 things. One is a sync/async api while the other is whether to have an infinite or finite stream. Is it possible to build a finite KafkaStream on a range of messages? Perhaps a Simple Consumer would do just fine and then I could start off getting the writeOffset from zookeeper and tell it to read a specified range per partition. I've done this and forked a simple consumer runnable for each partition, for one of our analyzers. The great thing about the high-level consumer is that rebalance, so I can fork however many stream readers I want and you just figure it out for me. In that way you offer us the control over the resource consumption within a pull model. This is best to regulate message pressure, they say. Combining that high-level rebalance ability with a ranged partition drain could be really nice...build the stream with an ending position and it is a finite stream, but retain the high-level rebalance. With a finite stream, you would know the difference of the 2 async scenarios: fetch-in-progress versus end-of-stream. With an infinite stream, you never get end-of-stream. Aside from a high-level consumer over a finite range within each partition, the other feature I can think of is more complicated. A high-level consumer has state machine changes that the client cannot access, to my knowledge. Our use of kafka has us invoke a message handler with each message we consumer from the KafkaStream, so we convert a pull-model to a push-model. Including the idea of receiving notifications from state machine changes, what would be really nice is to have a KafkaMessageSource, that is an eventful push model. If it were thread-safe, then we could register listeners for various events: * opening-stream * closing-stream * message-arrived * end-of-stream/no-more-messages-in-partition (for finite streams) * rebalance started * partition assigned * partition unassigned * rebalance finished * partition-offset-committed Perhaps that is just our use, but instead of a pull-oriented KafkaStream, is there any sense in your providing a push-oriented KafkaMessageSource publishing OOB messages? thank you, Robert On Feb 21, 2014, at 5:59 PM, Jun Rao <jun...@gmail.com<mailto:jun...@gmail.com><mailto: jun...@gmail.com<mailto:jun...@gmail.com>><mailto: jun...@gmail.com<mailto:jun...@gmail.com><mailto:jun...@gmail.com>>> wrote: Robert, Could you explain why you want to distinguish btw FetchingInProgressException and NoMessagePendingException? The nextMsgs() method that you want is exactly what poll() does. Thanks, Jun On Wed, Feb 19, 2014 at 8:45 AM, Withers, Robert <robert.with...@dish.com<mailto:robert.with...@dish.com> <mailto:robert.with...@dish.com> <mailto:robert.with...@dish.com>>wrote: I am not clear on why the consumer stream should be positionable, especially if it is limited to the in-memory fetched messages. Could someone explain to me, please? I really like the idea of committing the offset specifically on those partitions with changed read offsets, only. 2 items I would like to see added to the KafkaStream are: * a non-blocking next(), throws several exceptions (FetchingInProgressException and a NoMessagePendingException or something) to differentiate between fetching or no messages left. * A nextMsgs() method which returns all locally available messages and kicks off a fetch for the next chunk. If you are trying to add transactional features, then formally define a DTP capability and pull in other server frameworks to share the implementation. Should it be XA/Open? How about a new peer2peer DTP protocol? Thank you, Robert Robert Withers Staff Analyst/Developer o: (720) 514-8963 c: (571) 262-1873 -----Original Message----- From: Jay Kreps [mailto:jay.kr...@gmail.com] Sent: Sunday, February 16, 2014 10:13 AM To: users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org><mailto: users@kafka.apache.org<mailto:users@kafka.apache.org>> Subject: Re: New Consumer API discussion +1 I think those are good. It is a little weird that changing the +fetch point is not batched but changing the commit point is, but I suppose there is no helping that. -Jay On Sat, Feb 15, 2014 at 7:52 AM, Neha Narkhede <neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com> <mailto:neha.narkh...@gmail.com> <mailto:neha.narkh...@gmail.com> <mailto:neha.narkh...@gmail.com>>wrote: Jay, That makes sense. position/seek deal with changing the consumers in-memory data, so there is no remote rpc there. For some reason, I got committed and seek mixed up in my head at that time :) So we still end up with long position(TopicPartition tp) void seek(TopicPartitionOffset p) Map<TopicPartition, Long> committed(TopicPartition tp); void commit(TopicPartitionOffset...); Thanks, Neha On Friday, February 14, 2014, Jay Kreps <jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto: jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto: jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:jay.kr...@gmail.com>><mailto: jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:jay.kr...@gmail.com><mailto:jay.kreps@gmail .com>>> wrote: Oh, interesting. So I am assuming the following implementation: 1. We have an in-memory fetch position which controls the next fetch offset. 2. Changing this has no effect until you poll again at which point your fetch request will be from the newly specified offset 3. We then have an in-memory but also remotely stored committed offset. 4. Calling commit has the effect of saving the fetch position as both the in memory committed position and in the remote store 5. Auto-commit is the same as periodically calling commit on all positions. So batching on commit as well as getting the committed position makes sense, but batching the fetch position wouldn't, right? I think you are actually thinking of a different approach. -Jay On Thu, Feb 13, 2014 at 10:40 PM, Neha Narkhede <neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto:neha.narkh...@gmail.com><mailto: neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>> <javascript:;> wrote: I think you are saying both, i.e. if you have committed on a partition it returns you that value but if you haven't it does a remote lookup? Correct. The other argument for making committed batched is that commit() is batched, so there is symmetry. position() and seek() are always in memory changes (I assume) so there is no need to batch them. I'm not as sure as you are about that assumption being true. Basically in my example above, the batching argument for committed() also applies to position() since one purpose of fetching a partition's offset is to use it to set the position of the consumer to that offset. Since that might lead to a remote OffsetRequest call, I think we probably would be better off batching it. Another option for naming would be position/reposition instead of position/seek. I think position/seek is better since it aligns with Java file APIs. I also think your suggestion about ConsumerPosition makes sense. Thanks, Neha On Feb 13, 2014 9:22 PM, "Jay Kreps" <jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto: jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto: jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:jay.kr...@gmail.com>><mailto: jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:jay.kr...@gmail.com><mailto:jay.kreps@gmail .com>>> wrote: Hey Neha, I actually wasn't proposing the name TopicOffsetPosition, that was just a typo. I meant TopicPartitionOffset, and I was just referencing what was in the javadoc. So to restate my proposal without the typo, using just the existing classes (that naming is a separate question): long position(TopicPartition tp) void seek(TopicPartitionOffset p) long committed(TopicPartition tp) void commit(TopicPartitionOffset...); So I may be unclear on committed() (AKA lastCommittedOffset). Is it returning the in-memory value from the last commit by this consumer, or is it doing a remote fetch, or both? I think you are saying both, i.e. if you have committed on a partition it returns you that value but if you haven't it does a remote lookup? The other argument for making committed batched is that commit() is batched, so there is symmetry. position() and seek() are always in memory changes (I assume) so there is no need to batch them. So taking all that into account what if we revise it to long position(TopicPartition tp) void seek(TopicPartitionOffset p) Map<TopicPartition, Long> committed(TopicPartition tp); void commit(TopicPartitionOffset...); This is not symmetric between position/seek and commit/committed but it is convenient. Another option for naming would be position/reposition instead of position/seek. With respect to the name TopicPartitionOffset, what I was trying to say is that I recommend we change that to something shorter. I think TopicPosition or ConsumerPosition might be better. Position does not refer to the variables in the object, it refers to the meaning of the object--it represents a position within a topic. The offset field in that object is still called the offset. TopicOffset, PartitionOffset, or ConsumerOffset would all be workable too. Basically I am just objecting to concatenating three nouns together. :-) -Jay On Thu, Feb 13, 2014 at 1:54 PM, Neha Narkhede < neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto:neha.narkh...@gmail.com><mailto: neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>><mailto: neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto:neha.narkh...@gmail.com>> wrote: 2. It returns a list of results. But how can you use the list? The only way to use the list is to make a map of tp=>offset and then look up results in this map (or do a for loop over the list for the partition you want). I recommend that if this is an in-memory check we just do one at a time. E.g. long committedPosition( TopicPosition). This was discussed in the previous emails. There is a choic -- Robert Withers robert.with...@dish.com<mailto:robert.with...@dish.com><mailto:robert.with...@dish.com><mailto: robert.with...@dish.com<mailto:robert.with...@dish.com>> c: 303.919.5856 -- Robert Withers robert.with...@dish.com<mailto:robert.with...@dish.com><mailto:robert.with...@dish.com> c: 303.919.5856 -- -- Guozhang -- Robert Withers robert.with...@dish.com<mailto:robert.with...@dish.com> c: 303.919.5856