Jun,

I was originally thinking a non-blocking read from a distributed stream should 
distinguish between "no local messages, but a fetch is occurring” versus “you 
have drained the stream”.  The reason this may be valuable to me is so I can 
write consumers that read all known traffic then terminate.  You caused me to 
reconsider and I think I am conflating 2 things.  One is a sync/async api while 
the other is whether to have an infinite or finite stream.  Is it possible to 
build a finite KafkaStream on a range of messages?

Perhaps a Simple Consumer would do just fine and then I could start off getting 
the writeOffset from zookeeper and tell it to read a specified range per 
partition.  I’ve done this and forked a simple consumer runnable for each 
partition, for one of our analyzers.  The great thing about the high-level 
consumer is that rebalance, so I can fork however many stream readers I want 
and you just figure it out for me.  In that way you offer us the control over 
the resource consumption within a pull model.  This is best to regulate message 
pressure, they say.

Combining that high-level rebalance ability with a ranged partition drain could 
be really nice…build the stream with an ending position and it is a finite 
stream, but retain the high-level rebalance.  With a finite stream, you would 
know the difference of the 2 async scenarios: fetch-in-progress versus 
end-of-stream.  With an infinite stream, you never get end-of-stream.

Aside from a high-level consumer over a finite range within each partition, the 
other feature I can think of is more complicated.  A high-level consumer has 
state machine changes that the client cannot access, to my knowledge.  Our use 
of kafka has us invoke a message handler with each message we consumer from the 
KafkaStream, so we convert a pull-model to a push-model.  Including the idea of 
receiving notifications from state machine changes, what would be really nice 
is to have a KafkaMessageSource, that is an eventful push model.  If it were 
thread-safe, then we could register listeners for various events:

 *   opening-stream
 *   closing-stream
 *   message-arrived
 *   end-of-stream/no-more-messages-in-partition (for finite streams)
 *   rebalance started
 *   partition assigned
 *   partition unassigned
 *   rebalance finished
 *   partition-offset-committed

Perhaps that is just our use, but instead of a pull-oriented KafkaStream, is 
there any sense in your providing a push-oriented KafkaMessageSource publishing 
OOB messages?

thank you,
Robert

On Feb 21, 2014, at 5:59 PM, Jun Rao 
<jun...@gmail.com<mailto:jun...@gmail.com>> wrote:

Robert,

Could you explain why you want to distinguish btw FetchingInProgressException
and NoMessagePendingException? The nextMsgs() method that you want is
exactly what poll() does.

Thanks,

Jun


On Wed, Feb 19, 2014 at 8:45 AM, Withers, Robert 
<robert.with...@dish.com<mailto:robert.with...@dish.com>>wrote:

I am not clear on why the consumer stream should be positionable,
especially if it is limited to the in-memory fetched messages.  Could
someone explain to me, please?  I really like the idea of committing the
offset specifically on those partitions with changed read offsets, only.



2 items I would like to see added to the KafkaStream are:

*         a non-blocking next(), throws several exceptions
(FetchingInProgressException and a NoMessagePendingException or something)
to differentiate between fetching or no messages left.

*         A nextMsgs() method which returns all locally available messages
and kicks off a fetch for the next chunk.



If you are trying to add transactional features, then formally define a
DTP capability and pull in other server frameworks to share the
implementation.  Should it be XA/Open?  How about a new peer2peer DTP
protocol?



Thank you,

Robert



Robert Withers

Staff Analyst/Developer

o: (720) 514-8963

c:  (571) 262-1873



-----Original Message-----
From: Jay Kreps [mailto:jay.kr...@gmail.com]
Sent: Sunday, February 16, 2014 10:13 AM
To: users@kafka.apache.org<mailto:users@kafka.apache.org>
Subject: Re: New Consumer API discussion



+1 I think those are good. It is a little weird that changing the fetch

point is not batched but changing the commit point is, but I suppose there
is no helping that.



-Jay





On Sat, Feb 15, 2014 at 7:52 AM, Neha Narkhede 
<neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>
<mailto:neha.narkh...@gmail.com>>wrote:



Jay,



That makes sense. position/seek deal with changing the consumers

in-memory data, so there is no remote rpc there. For some reason, I

got committed and seek mixed up in my head at that time :)



So we still end up with



  long position(TopicPartition tp)

  void seek(TopicPartitionOffset p)

  Map<TopicPartition, Long> committed(TopicPartition tp);

  void commit(TopicPartitionOffset...);



Thanks,

Neha



On Friday, February 14, 2014, Jay Kreps 
<jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:
jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>>> wrote:



Oh, interesting. So I am assuming the following implementation:

1. We have an in-memory fetch position which controls the next fetch

offset.

2. Changing this has no effect until you poll again at which point

your fetch request will be from the newly specified offset 3. We

then have an in-memory but also remotely stored committed offset.

4. Calling commit has the effect of saving the fetch position as

both the in memory committed position and in the remote store 5.

Auto-commit is the same as periodically calling commit on all

positions.



So batching on commit as well as getting the committed position

makes sense, but batching the fetch position wouldn't, right? I

think you are actually thinking of a different approach.



-Jay





On Thu, Feb 13, 2014 at 10:40 PM, Neha Narkhede

<neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>

<javascript:;>

wrote:



I think you are saying both, i.e. if you have committed on a

partition it returns you that value but if you

haven't

it does a remote lookup?



Correct.



The other argument for making committed batched is that commit()

is batched, so there is symmetry.



position() and seek() are always in memory changes (I assume) so

there

is

no need to batch them.



I'm not as sure as you are about that assumption being true.

Basically

in

my example above, the batching argument for committed() also

applies to

position() since one purpose of fetching a partition's offset is

to use

it

to set the position of the consumer to that offset. Since that

might

lead

to a remote OffsetRequest call, I think we probably would be

better off batching it.



Another option for naming would be position/reposition instead of

position/seek.



I think position/seek is better since it aligns with Java file APIs.



I also think your suggestion about ConsumerPosition makes sense.



Thanks,

Neha

On Feb 13, 2014 9:22 PM, "Jay Kreps" 
<jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:
jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>>> wrote:



Hey Neha,



I actually wasn't proposing the name TopicOffsetPosition, that

was

just a

typo. I meant TopicPartitionOffset, and I was just referencing

what

was

in

the javadoc. So to restate my proposal without the typo, using

just

the

existing classes (that naming is a separate question):

  long position(TopicPartition tp)

  void seek(TopicPartitionOffset p)

  long committed(TopicPartition tp)

  void commit(TopicPartitionOffset...);



So I may be unclear on committed() (AKA lastCommittedOffset). Is

it returning the in-memory value from the last commit by this

consumer,

or

is

it doing a remote fetch, or both? I think you are saying both, i.e.

if

you

have committed on a partition it returns you that value but if

you

haven't

it does a remote lookup?



The other argument for making committed batched is that commit()

is batched, so there is symmetry.



position() and seek() are always in memory changes (I assume) so

there

is

no need to batch them.



So taking all that into account what if we revise it to

  long position(TopicPartition tp)

  void seek(TopicPartitionOffset p)

  Map<TopicPartition, Long> committed(TopicPartition tp);

  void commit(TopicPartitionOffset...);



This is not symmetric between position/seek and commit/committed

but

it

is

convenient. Another option for naming would be

position/reposition

instead

of position/seek.



With respect to the name TopicPartitionOffset, what I was trying

to

say

is

that I recommend we change that to something shorter. I think

TopicPosition

or ConsumerPosition might be better. Position does not refer to

the variables in the object, it refers to the meaning of the

object--it represents a position within a topic. The offset

field in that object

is

still called the offset. TopicOffset, PartitionOffset, or

ConsumerOffset

would all be workable too. Basically I am just objecting to

concatenating

three nouns together. :-)



-Jay











On Thu, Feb 13, 2014 at 1:54 PM, Neha Narkhede <

neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto:neha.narkh...@gmail.com>

wrote:



2. It returns a list of results. But how can you use the list?

The

only

way

to use the list is to make a map of tp=>offset and then look

up

results

in

this map (or do a for loop over the list for the partition you

want). I

recommend that if this is an in-memory check we just do one at

a

time.

E.g.

long committedPosition(

TopicPosition).



This was discussed in the previous emails. There is a choic




--
Robert Withers
robert.with...@dish.com<mailto:robert.with...@dish.com>
c: 303.919.5856

Reply via email to