Hi Robert,

Yes, you can check out the callback functions in the new API

onPartitionDesigned
onPartitionAssigned

and see if they meet your needs.

Guozhang


On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert <robert.with...@dish.com>wrote:

> Jun,
>
> Are you saying it is possible to get events from the high-level consumer
> regarding various state machine changes?  For instance, can we get a
> notification when a rebalance starts and ends, when a partition is
> assigned/unassigned, when an offset is committed on a partition, when a
> leader changes and so on?  I call this OOB traffic, since they are not the
> core messages streaming, but side-band events, yet they are still
> potentially useful to consumers.
>
> Thank you,
> Robert
>
>
> Robert Withers
> Staff Analyst/Developer
> o: (720) 514-8963
> c:  (571) 262-1873
>
>
>
> -----Original Message-----
> From: Jun Rao [mailto:jun...@gmail.com]
> Sent: Sunday, February 23, 2014 4:19 PM
> To: users@kafka.apache.org
> Subject: Re: New Consumer API discussion
>
> Robert,
>
> For the push orient api, you can potentially implement your own
> MessageHandler with those methods. In the main loop of our new consumer
> api, you can just call those methods based on the events you get.
>
> Also, we already have an api to get the first and the last offset of a
> partition (getOffsetBefore).
>
> Thanks,
>
> Jun
>
>
> On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert
> <robert.with...@dish.com>wrote:
>
> > This is a good idea, too.  I would modify it to include stream
> > marking, then you can have:
> >
> > long end = consumer.lastOffset(tp);
> > consumer.setMark(end);
> > while(consumer.beforeMark()) {
> >    process(consumer.pollToMark());
> > }
> >
> > or
> >
> > long end = consumer.lastOffset(tp);
> > consumer.setMark(end);
> > for(Object msg : consumer.iteratorToMark()) {
> >    process(msg);
> > }
> >
> > I actually have 4 suggestions, then:
> >
> >  *   pull: stream marking
> >  *   pull: finite streams, bound by time range (up-to-now, yesterday) or
> > offset
> >  *   pull: async api
> >  *   push: KafkaMessageSource, for a push model, with msg and OOB events.
> >  Build one in either individual or chunk mode and have a listener for
> > each msg or a listener for a chunk of msgs.  Make it composable and
> > policy driven (chunked, range, commitOffsets policy, retry policy,
> > transactional)
> >
> > Thank you,
> > Robert
> >
> > On Feb 22, 2014, at 11:21 AM, Jay Kreps <jay.kr...@gmail.com<mailto:
> > jay.kr...@gmail.com>> wrote:
> >
> > I think what Robert is saying is that we need to think through the
> > offset API to enable "batch processing" of topic data. Think of a
> > process that periodically kicks off to compute a data summary or do a
> > data load or something like that. I think what we need to support this
> > is an api to fetch the last offset from the server for a partition.
> Something like
> >   long lastOffset(TopicPartition tp)
> > and for symmetry
> >   long firstOffset(TopicPartition tp)
> >
> > Likely this would have to be batched. Essentially we should add this
> > use case to our set of code examples to write and think through.
> >
> > The usage would be something like
> >
> > long end = consumer.lastOffset(tp);
> > while(consumer.position < end)
> >    process(consumer.poll());
> >
> > -Jay
> >
> >
> > On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert
> > <robert.with...@dish.com
> > <mailto:robert.with...@dish.com>>wrote:
> >
> > Jun,
> >
> > I was originally thinking a non-blocking read from a distributed
> > stream should distinguish between "no local messages, but a fetch is
> occurring"
> > versus "you have drained the stream".  The reason this may be valuable
> > to me is so I can write consumers that read all known traffic then
> terminate.
> > You caused me to reconsider and I think I am conflating 2 things.  One
> > is a sync/async api while the other is whether to have an infinite or
> > finite stream.  Is it possible to build a finite KafkaStream on a
> > range of messages?
> >
> > Perhaps a Simple Consumer would do just fine and then I could start
> > off getting the writeOffset from zookeeper and tell it to read a
> > specified range per partition.  I've done this and forked a simple
> > consumer runnable for each partition, for one of our analyzers.  The
> > great thing about the high-level consumer is that rebalance, so I can
> > fork however many stream readers I want and you just figure it out for
> > me.  In that way you offer us the control over the resource
> > consumption within a pull model.  This is best to regulate message
> pressure, they say.
> >
> > Combining that high-level rebalance ability with a ranged partition
> > drain could be really nice...build the stream with an ending position
> > and it is a finite stream, but retain the high-level rebalance.  With
> > a finite stream, you would know the difference of the 2 async
> > scenarios: fetch-in-progress versus end-of-stream.  With an infinite
> > stream, you never get end-of-stream.
> >
> > Aside from a high-level consumer over a finite range within each
> > partition, the other feature I can think of is more complicated.  A
> > high-level consumer has state machine changes that the client cannot
> > access, to my knowledge.  Our use of kafka has us invoke a message
> > handler with each message we consumer from the KafkaStream, so we
> > convert a pull-model to a push-model.  Including the idea of receiving
> > notifications from state machine changes, what would be really nice is
> > to have a KafkaMessageSource, that is an eventful push model.  If it
> > were thread-safe, then we could register listeners for various events:
> >
> > *   opening-stream
> > *   closing-stream
> > *   message-arrived
> > *   end-of-stream/no-more-messages-in-partition (for finite streams)
> > *   rebalance started
> > *   partition assigned
> > *   partition unassigned
> > *   rebalance finished
> > *   partition-offset-committed
> >
> > Perhaps that is just our use, but instead of a pull-oriented
> > KafkaStream, is there any sense in your providing a push-oriented
> > KafkaMessageSource publishing OOB messages?
> >
> > thank you,
> > Robert
> >
> > On Feb 21, 2014, at 5:59 PM, Jun Rao <jun...@gmail.com<mailto:
> > jun...@gmail.com><mailto:
> > jun...@gmail.com<mailto:jun...@gmail.com>>> wrote:
> >
> > Robert,
> >
> > Could you explain why you want to distinguish btw
> > FetchingInProgressException and NoMessagePendingException? The
> > nextMsgs() method that you want is exactly what poll() does.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Wed, Feb 19, 2014 at 8:45 AM, Withers, Robert
> > <robert.with...@dish.com <mailto:robert.with...@dish.com>
> > <mailto:robert.with...@dish.com>>wrote:
> >
> > I am not clear on why the consumer stream should be positionable,
> > especially if it is limited to the in-memory fetched messages.  Could
> > someone explain to me, please?  I really like the idea of committing
> > the offset specifically on those partitions with changed read offsets,
> only.
> >
> >
> >
> > 2 items I would like to see added to the KafkaStream are:
> >
> > *         a non-blocking next(), throws several exceptions
> > (FetchingInProgressException and a NoMessagePendingException or
> > something) to differentiate between fetching or no messages left.
> >
> > *         A nextMsgs() method which returns all locally available
> messages
> > and kicks off a fetch for the next chunk.
> >
> >
> >
> > If you are trying to add transactional features, then formally define
> > a DTP capability and pull in other server frameworks to share the
> > implementation.  Should it be XA/Open?  How about a new peer2peer DTP
> > protocol?
> >
> >
> >
> > Thank you,
> >
> > Robert
> >
> >
> >
> > Robert Withers
> >
> > Staff Analyst/Developer
> >
> > o: (720) 514-8963
> >
> > c:  (571) 262-1873
> >
> >
> >
> > -----Original Message-----
> > From: Jay Kreps [mailto:jay.kr...@gmail.com]
> > Sent: Sunday, February 16, 2014 10:13 AM
> > To: users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:
> > users@kafka.apache.org>
> > Subject: Re: New Consumer API discussion
> >
> >
> >
> > +1 I think those are good. It is a little weird that changing the
> > +fetch
> >
> > point is not batched but changing the commit point is, but I suppose
> > there is no helping that.
> >
> >
> >
> > -Jay
> >
> >
> >
> >
> >
> > On Sat, Feb 15, 2014 at 7:52 AM, Neha Narkhede
> > <neha.narkh...@gmail.com <mailto:neha.narkh...@gmail.com>
> > <mailto:neha.narkh...@gmail.com>
> > <mailto:neha.narkh...@gmail.com>>wrote:
> >
> >
> >
> > Jay,
> >
> >
> >
> > That makes sense. position/seek deal with changing the consumers
> >
> > in-memory data, so there is no remote rpc there. For some reason, I
> >
> > got committed and seek mixed up in my head at that time :)
> >
> >
> >
> > So we still end up with
> >
> >
> >
> >  long position(TopicPartition tp)
> >
> >  void seek(TopicPartitionOffset p)
> >
> >  Map<TopicPartition, Long> committed(TopicPartition tp);
> >
> >  void commit(TopicPartitionOffset...);
> >
> >
> >
> > Thanks,
> >
> > Neha
> >
> >
> >
> > On Friday, February 14, 2014, Jay Kreps <jay.kr...@gmail.com<mailto:
> > jay.kr...@gmail.com><mailto:
> > jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto:
> > jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:jay.kreps@gmail
> > .com>>>
> > wrote:
> >
> >
> >
> > Oh, interesting. So I am assuming the following implementation:
> >
> > 1. We have an in-memory fetch position which controls the next fetch
> >
> > offset.
> >
> > 2. Changing this has no effect until you poll again at which point
> >
> > your fetch request will be from the newly specified offset 3. We
> >
> > then have an in-memory but also remotely stored committed offset.
> >
> > 4. Calling commit has the effect of saving the fetch position as
> >
> > both the in memory committed position and in the remote store 5.
> >
> > Auto-commit is the same as periodically calling commit on all
> >
> > positions.
> >
> >
> >
> > So batching on commit as well as getting the committed position
> >
> > makes sense, but batching the fetch position wouldn't, right? I
> >
> > think you are actually thinking of a different approach.
> >
> >
> >
> > -Jay
> >
> >
> >
> >
> >
> > On Thu, Feb 13, 2014 at 10:40 PM, Neha Narkhede
> >
> > <neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto:
> > neha.narkh...@gmail.com>
> >
> > <javascript:;>
> >
> > wrote:
> >
> >
> >
> > I think you are saying both, i.e. if you have committed on a
> >
> > partition it returns you that value but if you
> >
> > haven't
> >
> > it does a remote lookup?
> >
> >
> >
> > Correct.
> >
> >
> >
> > The other argument for making committed batched is that commit()
> >
> > is batched, so there is symmetry.
> >
> >
> >
> > position() and seek() are always in memory changes (I assume) so
> >
> > there
> >
> > is
> >
> > no need to batch them.
> >
> >
> >
> > I'm not as sure as you are about that assumption being true.
> >
> > Basically
> >
> > in
> >
> > my example above, the batching argument for committed() also
> >
> > applies to
> >
> > position() since one purpose of fetching a partition's offset is
> >
> > to use
> >
> > it
> >
> > to set the position of the consumer to that offset. Since that
> >
> > might
> >
> > lead
> >
> > to a remote OffsetRequest call, I think we probably would be
> >
> > better off batching it.
> >
> >
> >
> > Another option for naming would be position/reposition instead of
> >
> > position/seek.
> >
> >
> >
> > I think position/seek is better since it aligns with Java file APIs.
> >
> >
> >
> > I also think your suggestion about ConsumerPosition makes sense.
> >
> >
> >
> > Thanks,
> >
> > Neha
> >
> > On Feb 13, 2014 9:22 PM, "Jay Kreps" <jay.kr...@gmail.com<mailto:
> > jay.kr...@gmail.com><mailto:
> > jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto:
> > jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:jay.kreps@gmail
> > .com>>>
> > wrote:
> >
> >
> >
> > Hey Neha,
> >
> >
> >
> > I actually wasn't proposing the name TopicOffsetPosition, that
> >
> > was
> >
> > just a
> >
> > typo. I meant TopicPartitionOffset, and I was just referencing
> >
> > what
> >
> > was
> >
> > in
> >
> > the javadoc. So to restate my proposal without the typo, using
> >
> > just
> >
> > the
> >
> > existing classes (that naming is a separate question):
> >
> >  long position(TopicPartition tp)
> >
> >  void seek(TopicPartitionOffset p)
> >
> >  long committed(TopicPartition tp)
> >
> >  void commit(TopicPartitionOffset...);
> >
> >
> >
> > So I may be unclear on committed() (AKA lastCommittedOffset). Is
> >
> > it returning the in-memory value from the last commit by this
> >
> > consumer,
> >
> > or
> >
> > is
> >
> > it doing a remote fetch, or both? I think you are saying both, i.e.
> >
> > if
> >
> > you
> >
> > have committed on a partition it returns you that value but if
> >
> > you
> >
> > haven't
> >
> > it does a remote lookup?
> >
> >
> >
> > The other argument for making committed batched is that commit()
> >
> > is batched, so there is symmetry.
> >
> >
> >
> > position() and seek() are always in memory changes (I assume) so
> >
> > there
> >
> > is
> >
> > no need to batch them.
> >
> >
> >
> > So taking all that into account what if we revise it to
> >
> >  long position(TopicPartition tp)
> >
> >  void seek(TopicPartitionOffset p)
> >
> >  Map<TopicPartition, Long> committed(TopicPartition tp);
> >
> >  void commit(TopicPartitionOffset...);
> >
> >
> >
> > This is not symmetric between position/seek and commit/committed
> >
> > but
> >
> > it
> >
> > is
> >
> > convenient. Another option for naming would be
> >
> > position/reposition
> >
> > instead
> >
> > of position/seek.
> >
> >
> >
> > With respect to the name TopicPartitionOffset, what I was trying
> >
> > to
> >
> > say
> >
> > is
> >
> > that I recommend we change that to something shorter. I think
> >
> > TopicPosition
> >
> > or ConsumerPosition might be better. Position does not refer to
> >
> > the variables in the object, it refers to the meaning of the
> >
> > object--it represents a position within a topic. The offset
> >
> > field in that object
> >
> > is
> >
> > still called the offset. TopicOffset, PartitionOffset, or
> >
> > ConsumerOffset
> >
> > would all be workable too. Basically I am just objecting to
> >
> > concatenating
> >
> > three nouns together. :-)
> >
> >
> >
> > -Jay
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > On Thu, Feb 13, 2014 at 1:54 PM, Neha Narkhede <
> >
> > neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto:
> > neha.narkh...@gmail.com><mailto:
> > neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>>
> >
> > wrote:
> >
> >
> >
> > 2. It returns a list of results. But how can you use the list?
> >
> > The
> >
> > only
> >
> > way
> >
> > to use the list is to make a map of tp=>offset and then look
> >
> > up
> >
> > results
> >
> > in
> >
> > this map (or do a for loop over the list for the partition you
> >
> > want). I
> >
> > recommend that if this is an in-memory check we just do one at
> >
> > a
> >
> > time.
> >
> > E.g.
> >
> > long committedPosition(
> >
> > TopicPosition).
> >
> >
> >
> > This was discussed in the previous emails. There is a choic
> >
> >
> >
> >
> > --
> > Robert Withers
> > robert.with...@dish.com<mailto:robert.with...@dish.com><mailto:
> > robert.with...@dish.com>
> > c: 303.919.5856
> >
> >
> >
> > --
> > Robert Withers
> > robert.with...@dish.com<mailto:robert.with...@dish.com>
> > c: 303.919.5856
> >
> >
>



-- 
-- Guozhang

Reply via email to