Pradeep -

Thanks for your detailed comments.

1.

   subscribe(String topic, int... paritions) and unsubscribe(String topic,
   int... partitions) should be subscribe(TopicPartition...
topicPartitions)and unsubscribe(TopicPartition...
   topicPartitons)

I think that is reasonable. Overall, I'm in favor of exposing
TopicPartition and TopicPartitionOffset as public APIs. They make the APIs
more readable especially given that the consumer aims to provide a small
set of APIs to support a wide range of functionalities. I will make that
change if there are no objections.

    2.

   Does it make sense to provide a convenience method to subscribe to
   topics at a particular offset directly? E.g.
subscribe(
TopicPartitionOffset...
   offsets)

 I view subscriptions a little differently. One subscribes to resources. In
this case, either topics (when you use group management) or specific
partitions. Offsets are specific to the consumption protocol and unrelated
to subscription which just expresses the user's interest in certain
resources. Also, if we have one way to specify fetch offsets (positions()),
I'd like to avoid creating *n* APIs to do the same thing, since that just
makes the consumer APIs more bulky and eventually confusing.

    3.

   The javadoc makes no mention of what would happen if positions() is
   called with a TopicPartitionOffset to which the Consumer is not
   subscribed to.

 Good point. Fixed the
javadoc<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html#positions%28kafka.common.TopicPartitionOffset...%29>

    4.

   The javadoc makes no mention of what would happen if positions() is
   called with two different offsets for a single TopicPartition

positions() can be called multiple times and hence with different offsets.
I think I mentioned in the latest javadoc that positions() will change the
offset on the next fetch request (poll()). Improved the javadoc to
explicitly mention this case.

    5. The javadoc shows lastCommittedOffsets() return type as
   List<TopicPartitionOffset>. This should either be Map<TopicPartition,
   Long> or Map<TopicPartition, TopicPartitionOffset>

 This depends on how the user would use the committed offsets. One example
I could think off and is mentioned in the javadoc for
lastCommittedOffsets() is to rewind consumption. In this case, you may or
may not require random access to a particular partition's offset, depending
on whether you want to selectively rewind consumption or not. So it may be
fine to return a map. I'm not sure if people can think of other uses of
this API though. In any case, if we
wanted to change this to a map, I'd prefer Map<TopicPartition, Long>.

   6. It seems like #4 can be avoided by using Map<TopicPartition,
Long> or Map<TopicPartition,
   TopicPartitionOffset> as the argument type.

How? lastCommittedOffsets() is independent of positions(). I'm not sure I
understood your suggestion.

   7. To address #3, maybe we can return List<TopicPartitionOffset> that
   are invalid.

I don't particularly see the advantage of returning a list of invalid
partitions from position(). It seems a bit awkward to return a list to
indicate what is obviously a bug. Prefer throwing an error since the user
should just fix that logic.

Thanks,
Neha



On Wed, Feb 12, 2014 at 3:59 PM, Jay Kreps <jay.kr...@gmail.com> wrote:

> Ah, gotcha.
>
> -Jay
>
>
> On Wed, Feb 12, 2014 at 8:40 AM, Neha Narkhede <neha.narkh...@gmail.com
> >wrote:
>
> > Jay
> >
> > Well none kind of address the common case which is to commit all
> > partitions. For these I was thinking just
> >    commit();
> > The advantage of this simpler method is that you don't need to bother
> about
> > partitions you just consume the messages given to you and then commit
> them
> >
> > This is already what the commit() API is supposed to do. Here is the
> > javadoc -
> >
> >     * Synchronously commits the specified offsets for the specified list
> of
> > topics and partitions to Kafka. If no partitions are specified,
> >      * commits offsets for the subscribed list of topics and partitions
> to
> > Kafka.
> >
> >     public void commit(TopicPartitionOffset... offsets);
> >
> > Could you take another look at the
> > javadoc<
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
> > >?
> > I've uploaded changes from the previous discussions and included some of
> > your review suggestions.
> >
> >
> >
> > On Wed, Feb 12, 2014 at 8:32 AM, Neha Narkhede <neha.narkh...@gmail.com
> > >wrote:
> >
> > > Imran,
> > >
> > >
> > > Sorry I am probably missing
> > > something basic, but I'm not sure how a multi-threaded consumer would
> > > work.  I can imagine its either:
> > >
> > > a) I just have one thread poll kafka.  If I want to process msgs in
> > > multiple threads, than I deal w/ that after polling, eg. stick them
> into
> > a
> > > blocking queue or something, and have more threads that read from the
> > > queue.
> > >
> > > b) each thread creates its own KafkaConsumer.  They are all registered
> > the
> > > same way, and I leave it to kafka to figure out what data to give to
> each
> > > one.
> > >
> > > We designed the new consumer API to not require multi threading on
> > > purpose.
> > > The reason this is better than the existing ZookeeperConsumerConnector
> is
> > > that
> > > it effectively allows the user to use whatever threading and load
> balance
> > > message
> > > processing amongst those threads. For example, you might want more
> > threads
> > > dedicated
> > > to a certain high throughput partition compared to other partitions. In
> > > option a) above, you can
> > > create your own thread pool and hand over the messages returned by poll
> > > using a blocking
> > > queue or any other approach. Option b) would work as well and the user
> > > has to figure out which
> > > topics each KafkaConsumer subscribes to.
> > >
> > >
> > > (a) certainly makes things simple, but I worry about throughput -- is
> > that
> > > just as good as having one thread trying to consumer each partition?
> > >
> > > (b) makes it a bit of a pain to figure out how many threads to use.  I
> > > assume there is no point in using more threads than there are
> partitions,
> > > so first you've got to figure out how many partitions there are in each
> > > topic.  Might be nice if there were some util functions to simplify
> this.
> > >
> > > The user can pick the number of threads. That is still better as only
> the
> > > user knows how
> > > slow/fast the message processing of her application is.
> > >
> > > Also, since the initial call to subscribe doesn't give the partition
> > > assignment, does that mean the first call to poll() will always call
> the
> > > ConsumerRebalanceCallback?
> > >
> > > Assuming you choose to use group management (by using
> subscribe(topics)),
> > > poll() will invoke
> > > the ConsumerRebalanceCallback on every single rebalance attempt.
> Improved
> > > the javadoc<
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerRebalanceCallback.html
> > >to
> > > explain that. Could you give that another look?
> > >
> > > If I'm on the right track, I'd like to expand this example, showing how
> > > each "MyConsumer" can keep track of its partitions & offsets, even in
> the
> > > face of rebalances.  As Jay said, I think a minimal code example could
> > > really help us see the utility & faults of the api.
> > >
> > > Sure, please look at the javadoc<
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
> > >.
> > > I've tried to include code examples there. Please help in
> > > improving those or adding more. Looks like we should add some multi
> > > threading examples. I avoided
> > > adding those since there are many ways to handling the message
> processing
> > > and it will not be feasible
> > > to list all of those. If we list one, people might think that is the
> only
> > > recommended approach.
> > >
> > > With that said, here is an example of using Option b) above -
> > >
> > >
> > > List<MyConsumer> consumers = new ArrayList<MyConsumer>();
> > > List<String> topics = new ArrayList<String();
> > > // populate topics
> > > assert(consumers.size == topics.size);
> > >
> > > for (int i = 0; i < numThreads; i++) {
> > >   MyConsumer c = new MyConsumer();
> > >   c.subscribe(topics(i));
> > >   consumers.add(c);
> > > }
> > > // poll each consumer in a separate thread.
> > > for (int i = 0; i < numThreads; i++) {
> > >    executorService.submit(new Runnable() {
> > >         @Override
> > >          public void run() {
> > >              new ProcessMessagesTask(consumers(i));
> > >          }
> > >    });
> > > }
> > >
> > > Let me know what you think.
> > >
> > > Thanks,
> > > Neha
> > >
> > > On Tue, Feb 11, 2014 at 3:54 PM, Jay Kreps <jay.kr...@gmail.com>
> wrote:
> > >
> > >> Comments inline:
> > >>
> > >>
> > >> On Mon, Feb 10, 2014 at 2:31 PM, Guozhang Wang <wangg...@gmail.com>
> > >> wrote:
> > >>
> > >> > Hello Jay,
> > >> >
> > >> > Thanks for the detailed comments.
> > >> >
> > >> > 1. Yeah we could discuss a bit more on that.
> > >> >
> > >> > 2. Since subscribe() is incremental, adding one topic-partition is
> OK,
> > >> and
> > >> > personally I think it is cleaner than subscribe(String topic,
> > >> > int...partition)?
> > >> >
> > >> I am not too particular. Have you actually tried this? I think writing
> > >> actual sample code is important.
> > >>
> > >>
> > >> > 3. Originally I was thinking about two interfaces:
> > >> >
> > >> > getOffsets() // offsets for all partitions that I am consuming now
> > >> >
> > >> > getOffset(topc-partition) // offset of the specified
> topic-partition,
> > >> will
> > >> > throw exception if it is not currently consumed.
> > >> >
> > >> > What do you think about these?
> > >> >
> > >>
> > >> The naming needs to distinguish committed offset position versus fetch
> > >> offset position. Also we aren't using the getX convention.
> > >>
> > >>
> > >> > 4. Yes, that remains a config.
> > >> >
> > >>
> > >> Does that make sense given that you change your position via an api
> now?
> > >>
> > >>
> > >> > 5. Agree.
> > >> >
> > >> > 6. If the time out value is null then it will "logically" return
> > >> > immediately with whatever data is available. I think an indefinitely
> > >> poll()
> > >> > function could be replaced with just
> > >> >
> > >> > while (true) poll(some-time)?
> > >> >
> > >>
> > >> That is fine but we should provide a no arg poll for that, poll(null)
> > >> isn't
> > >> clear. We should add the timeunit as per the post java 5 convention as
> > >> that
> > >> makes the call more readable. E.g.
> > >>    poll(5) vs poll(5, TimeUnit.MILLISECONDS)
> > >>
> > >>
> > >> > 7. I am open with either approach.
> > >> >
> > >>
> > >> Cool.
> > >>
> > >> 8. I was thinking about two interfaces for the commit functionality:
> > >> >
> > >> >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
> > >> >
> > >> > Do those sound better?
> > >> >
> > >>
> > >> Well none kind of address the common case which is to commit all
> > >> partitions. For these I was thinking just
> > >>    commit();
> > >> The advantage of this simpler method is that you don't need to bother
> > >> about
> > >> partitions you just consume the messages given to you and then commit
> > >> them.
> > >>
> > >> 9. Currently I think about un-subscribe as "close and re-subscribe",
> and
> > >> > would like to hear people's opinion about it.
> > >> >
> > >>
> > >> Hmm, I think it is a little weird if there is a subscribe which can be
> > >> called at any time but no unsubscribe. Would this be hard to do.
> > >>
> > >>
> > >> > 10. Yes. Position() is an API function, and as and API it means "be
> > >> called
> > >> > at any time" and will change the next fetching starting offset.
> > >> >
> > >>
> > >> Cool.
> > >>
> > >>
> > >> > 11. The ConsumerRecord would have the offset info of the message. Is
> > >> that
> > >> > what you want?
> > >> >
> > >>
> > >> But that is only after I have gotten a message. I'm not sure if that
> > >> covers
> > >> all cases or not.
> > >>
> > >>
> > >> > About use cases: great point. I will add some more examples of using
> > the
> > >> > API functions in the wiki pages.
> > >> >
> > >> > Guozhang
> > >> >
> > >> >
> > >> >
> > >> >
> > >> > On Mon, Feb 10, 2014 at 12:20 PM, Jay Kreps <jay.kr...@gmail.com>
> > >> wrote:
> > >> >
> > >> > > A few items:
> > >> > > 1. ConsumerRebalanceCallback
> > >> > >    a. onPartitionsRevoked would be a better name.
> > >> > >    b. We should discuss the possibility of splitting this into two
> > >> > > interfaces. The motivation would be that in Java 8 single method
> > >> > interfaces
> > >> > > can directly take methods which might be more intuitive.
> > >> > >    c. If we stick with a single interface I would prefer the name
> > >> > > RebalanceCallback as its more concise
> > >> > > 2. Should subscribe(String topic, int partition) should be
> > >> > subscribe(String
> > >> > > topic, int...partition)?
> > >> > > 3. Is lastCommittedOffset call just a local access? If so it would
> > be
> > >> > more
> > >> > > convenient not to batch it.
> > >> > > 4. How are we going to handle the earliest/latest starting
> position
> > >> > > functionality we currently have. Does that remain a config?
> > >> > > 5. Do we need to expose the general ability to get known positions
> > >> from
> > >> > the
> > >> > > log? E.g. the functionality in the OffsetRequest...? That would
> make
> > >> the
> > >> > > ability to change position a little easier.
> > >> > > 6. Should poll(java.lang.Long timeout) be poll(long timeout,
> > TimeUnit
> > >> > > unit)? Is it Long because it allows null? If so should we just
> add a
> > >> > poll()
> > >> > > that polls indefinitely?
> > >> > > 7. I recommend we remove the boolean parameter from commit as it
> is
> > >> > really
> > >> > > hard to read code that has boolean parameters without named
> > arguments.
> > >> > Can
> > >> > > we make it something like commit(...) and commitAsync(...)?
> > >> > > 8. What about the common case where you just want to commit the
> > >> current
> > >> > > position for all partitions?
> > >> > > 9. How do you unsubscribe?
> > >> > > 10. You say in a few places that positions() only impacts the
> > starting
> > >> > > position, but surely that isn't the case, right? Surely it
> controls
> > >> the
> > >> > > fetch position for that partition and can be called at any time?
> > >> > Otherwise
> > >> > > it is a pretty weird api, right?
> > >> > > 11. How do I get my current position? Not the committed position
> but
> > >> the
> > >> > > offset of the next message that will be given to me?
> > >> > >
> > >> > > One thing that I really found helpful for the API design was
> writing
> > >> out
> > >> > > actual code for different scenarios against the API. I think it
> > might
> > >> be
> > >> > > good to do that for this too--i.e. enumerate the various use cases
> > and
> > >> > code
> > >> > > that use case up to see how it looks. I'm not sure if it would be
> > >> useful
> > >> > to
> > >> > > collect these kinds of scenarios from people. I know they have
> > >> > sporadically
> > >> > > popped up on the mailing list.
> > >> > >
> > >> > > -Jay
> > >> > >
> > >> > >
> > >> > > On Mon, Feb 10, 2014 at 10:54 AM, Neha Narkhede <
> > >> neha.narkh...@gmail.com
> > >> > > >wrote:
> > >> > >
> > >> > > > As mentioned in previous emails, we are also working on a
> > >> > > re-implementation
> > >> > > > of the consumer. I would like to use this email thread to
> discuss
> > >> the
> > >> > > > details of the public API. I would also like us to be picky
> about
> > >> this
> > >> > > > public api now so it is as good as possible and we don't need to
> > >> break
> > >> > it
> > >> > > > in the future.
> > >> > > >
> > >> > > > The best way to get a feel for the API is actually to take a
> look
> > at
> > >> > the
> > >> > > > javadoc<
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
> > >> > > > >,
> > >> > > > the hope is to get the api docs good enough so that it is
> > >> > > self-explanatory.
> > >> > > > You can also take a look at the configs
> > >> > > > here<
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html
> > >> > > > >
> > >> > > >
> > >> > > > Some background info on implementation:
> > >> > > >
> > >> > > > At a high level the primary difference in this consumer is that
> it
> > >> > > removes
> > >> > > > the distinction between the "high-level" and "low-level"
> consumer.
> > >> The
> > >> > > new
> > >> > > > consumer API is non blocking and instead of returning a blocking
> > >> > > iterator,
> > >> > > > the consumer provides a poll() API that returns a list of
> records.
> > >> We
> > >> > > think
> > >> > > > this is better compared to the blocking iterators since it
> > >> effectively
> > >> > > > decouples the threading strategy used for processing messages
> from
> > >> the
> > >> > > > consumer. It is worth noting that the consumer is entirely
> single
> > >> > > threaded
> > >> > > > and runs in the user thread. The advantage is that it can be
> > easily
> > >> > > > rewritten in less multi-threading-friendly languages. The
> consumer
> > >> > > batches
> > >> > > > data and multiplexes I/O over TCP connections to each of the
> > >> brokers it
> > >> > > > communicates with, for high throughput. The consumer also allows
> > >> long
> > >> > > poll
> > >> > > > to reduce the end-to-end message latency for low throughput
> data.
> > >> > > >
> > >> > > > The consumer provides a group management facility that supports
> > the
> > >> > > concept
> > >> > > > of a group with multiple consumer instances (just like the
> current
> > >> > > > consumer). This is done through a custom heartbeat and group
> > >> management
> > >> > > > protocol transparent to the user. At the same time, it allows
> > users
> > >> the
> > >> > > > option to subscribe to a fixed set of partitions and not use
> group
> > >> > > > management at all. The offset management strategy defaults to
> > Kafka
> > >> > based
> > >> > > > offset management and the API provides a way for the user to
> use a
> > >> > > > customized offset store to manage the consumer's offsets.
> > >> > > >
> > >> > > > A key difference in this consumer also is the fact that it does
> > not
> > >> > > depend
> > >> > > > on zookeeper at all.
> > >> > > >
> > >> > > > More details about the new consumer design are
> > >> > > > here<
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
> > >> > > > >
> > >> > > >
> > >> > > > Please take a look at the new
> > >> > > > API<
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
> > >> > > > >and
> > >> > > > give us any thoughts you may have.
> > >> > > >
> > >> > > > Thanks,
> > >> > > > Neha
> > >> > > >
> > >> > >
> > >> >
> > >> >
> > >> >
> > >> > --
> > >> > -- Guozhang
> > >> >
> > >>
> > >
> > >
> >
>

Reply via email to