Hi Pradeep:

1. I think TopicPartition is designed as an internal class and the plan was
not to expose it to users just for simplicity. We probably will change the
commit APIs not exposing them.

2. We have thought about that before, and finally decide to make it as

subscribe(topic, partition)
positions(partition, offset)

Does this look good to you?

3. We will update the javadoc accordingly: an exception should be thrown.

4. I think this is related to how are we going to deal with 1).

5. Agree.


On Tue, Feb 11, 2014 at 12:58 PM, Pradeep Gollakota <pradeep...@gmail.com>wrote:

> Updated thoughts.
>
>    1.
>
>    subscribe(String topic, int... paritions) and unsubscribe(String topic,
>    int... partitions) should be subscribe(TopicPartition...
> topicPartitions)and unsubscribe(TopicPartition...
>    topicPartitons)
>     2.
>
>    Does it make sense to provide a convenience method to subscribe to
>    topics at a particular offset directly? E.g.
> subscribe(TopicPartitionOffset...
>    offsets)
>     3.
>
>    The javadoc makes no mention of what would happen if positions() is
>    called with a TopicPartitionOffset to which the Consumer is not
>    subscribed to.
>     4.
>
>    The javadoc makes no mention of what would happen if positions() is
>    called with two different offsets for a single TopicPartition
>     5. The javadoc shows lastCommittedOffsets() return type as
>    List<TopicPartitionOffset>. This should either be Map<TopicPartition,
>    Long> or Map<TopicPartition, TopicPartitionOffset>
>    6. It seems like #4 can be avoided by using Map<TopicPartition,
> Long> or Map<TopicPartition,
>    TopicPartitionOffset> as the argument type.
>    7. To address #3, maybe we can return List<TopicPartitionOffset> that
>    are invalid.
>
>
>
> On Tue, Feb 11, 2014 at 12:04 PM, Neha Narkhede <neha.narkh...@gmail.com
> >wrote:
>
> > Pradeep,
> >
> > To be clear, we want to get feedback on the APIs from the
> > javadoc<
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
> > >since
> > the wiki will be slightly behind on the APIs.
> >
> > 1. Regarding consistency, do you have specific feedback on which APIs
> > should have different arguments/return types?
> > 2. lastCommittedOffsets() does what you said in the javadoc.
> >
> > Thanks,
> > Neha
> >
> >
> > On Tue, Feb 11, 2014 at 11:45 AM, Pradeep Gollakota <
> pradeep...@gmail.com
> > >wrote:
> >
> > > Hi Jay,
> > >
> > > I apologize for derailing the conversation about the consumer API. We
> > > should start a new discussion about hierarchical topics, if we want to
> > keep
> > > talking about it. My final thought on the matter is that, hierarchical
> > > topics is still an important feature to have in Kafka, because it gives
> > us
> > > flexibility to do namespace level access controls.
> > >
> > > Getting back to the topic of the Consumer API:
> > >
> > >    1. Any thoughts on consistency for method arguments and return
> types?
> > >    2. lastCommittedOffsets() method returns a
> > > List<TopicPartitionOffset>where as the confluence page suggested a
> > > Map<TopicPartition,
> > >    Long>. I would think that a Map is the more appropriate return type.
> > >
> > >
> > >
> > > On Tue, Feb 11, 2014 at 8:04 AM, Jay Kreps <jay.kr...@gmail.com>
> wrote:
> > >
> > > > Hey Pradeep,
> > > >
> > > > That wiki is fairly old and it predated more flexible subscription
> > > > mechanisms. In the high-level consumer you currently have wildcard
> > > > subscription and in the new proposed interface you can actually
> > subscribe
> > > > based on any logic you want to create a "union" of streams.
> Personally
> > I
> > > > think this gives you everything you would want with a hierarchy and
> > more
> > > > actual flexibility (since you can define groupings however you want).
> > > What
> > > > do you think?
> > > >
> > > > -Jay
> > > >
> > > >
> > > > On Mon, Feb 10, 2014 at 3:37 PM, Pradeep Gollakota <
> > pradeep...@gmail.com
> > > > >wrote:
> > > >
> > > > > WRT to hierarchical topics, I'm referring to
> > > > > KAFKA-1175<https://issues.apache.org/jira/browse/KAFKA-1175>.
> > > > > I would just like to think through the implications for the
> Consumer
> > > API
> > > > if
> > > > > and when we do implement hierarchical topics. For example, in the
> > > > > proposal<
> > > > >
> > https://cwiki.apache.org/confluence/display/KAFKA/Hierarchical+Topics#
> > > > > >written
> > > > > by Jay, he says that initially wildcard subscriptions are not going
> > > > > to be supported. But does that mean that they will be supported in
> > v2?
> > > If
> > > > > that's the case, that would change the semantics of the Consumer
> API.
> > > > >
> > > > > As to having classes for Topic, PartitionId, etc. it looks like I
> was
> > > > > referring to the TopicPartition and TopicPartitionOffset classes (I
> > > > didn't
> > > > > realize these were already there). I was only looking at the
> > confluence
> > > > > page which shows List[(String, Int, Long)] instead of
> > > > > List[TopicParitionOffset] (as is shown in the javadoc). However, I
> > did
> > > > > notice that we're not being consistent in the Java version. E.g. we
> > > have
> > > > > commit(TopicPartitionOffset... offsets) and
> > > > > lastCommittedOffsets(TopicPartition... partitions) on the one hand.
> > On
> > > > the
> > > > > other hand we have subscribe(String topic, int... partitions). I
> > agree
> > > > that
> > > > > creating a class for TopicId today would probably not make too much
> > > sense
> > > > > today. But with hierarchical topics, I may change my mind. This is
> > > > exactly
> > > > > what was done in the HBase API in 0.96 when namespaces were added.
> > 0.96
> > > > > HBase API introduced a class called 'TableName' to represent the
> > > > namespace
> > > > > and table name.
> > > > >
> > > > >
> > > > > On Mon, Feb 10, 2014 at 3:08 PM, Neha Narkhede <
> > > neha.narkh...@gmail.com
> > > > > >wrote:
> > > > >
> > > > > > Thanks for the feedback.
> > > > > >
> > > > > > Mattijs -
> > > > > >
> > > > > > - Constructors link to
> > > > > > http://kafka.apache.org/documentation.html#consumerconfigs for
> > valid
> > > > > > configurations, which lists zookeeper.connect rather than
> > > > > > metadata.broker.list, the value for BROKER_LIST_CONFIG in
> > > > ConsumerConfig.
> > > > > > Fixed it to just point to ConsumerConfig for now until we
> finalize
> > > the
> > > > > new
> > > > > > configs
> > > > > > - Docs for poll(long) mention consumer.commit(true), which I
> can't
> > > find
> > > > > in
> > > > > > the Consumer docs. For a simple consumer setup, that call is
> > > something
> > > > > that
> > > > > > would make a lot of sense.
> > > > > > Missed changing the examples to use consumer.commit(true,
> offsets).
> > > The
> > > > > > suggestions by Jay would change it to commit(offsets) and
> > > > > > commitAsync(offsets), which will hopefully make it easier to
> > > understand
> > > > > > those commit APIs.
> > > > > > - Love the addition of MockConsumer, awesome for unittesting :)
> > > > > > I'm not quite satisfied with what it does as of right now, but we
> > > will
> > > > > > surely improve it as we start writing the consumer.
> > > > > >
> > > > > > Jay -
> > > > > >
> > > > > > 1. ConsumerRebalanceCallback
> > > > > >     a. Makes sense. Renamed to onPartitionsRevoked
> > > > > >     b. Ya, it will be good to make it forward compatible with
> Java
> > 8
> > > > > > capabilities. We can change it to PartitionsAssignedCallback and
> > > > > >          PartitionsRevokedCallback or RebalanceBeginCallback and
> > > > > > RebalanceEndCallback?
> > > > > >     c. Ya, I thought about that but then didn't name it just
> > > > > > RebalanceCallback since there could be a conflict with a
> controller
> > > > side
> > > > > > rebalance callback if/when we have one. However, you can argue
> that
> > > at
> > > > > that
> > > > > > time we can name it ControllerRebalanceCallback instead of
> > polluting
> > > a
> > > > > user
> > > > > > facing API. So agree with you here.
> > > > > > 2. Ya, that is a good idea. Changed to subscribe(String topic,
> > > > > > int...partitions).
> > > > > > 3. lastCommittedOffset() is not necessarily a local access since
> > the
> > > > > > consumer can potentially ask for the last committed offsets of
> > > > partitions
> > > > > > that the consumer does not consume and maintain the offsets for.
> > > That's
> > > > > the
> > > > > > reason it is batched right now.
> > > > > > 4. Yes, look at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html#AUTO_OFFSET_RESET_CONFIG
> > > > > > 5. Sure, but that is not part of the consumer API right? I think
> > > you're
> > > > > > suggesting looking at OffsetRequest to see if it would do that
> > > > properly?
> > > > > > 6. Good point. Changed to poll(long timeout, TimeUnit) and poll
> > with
> > > a
> > > > > > negative timeout will poll indefinitely?
> > > > > > 7. Good point. Changed to commit(...) and commitAsync(...)
> > > > > > 8. To commit the current position for all partitions owned by the
> > > > > consumer,
> > > > > > you can use commit(). If you don't use group management, then
> > > > > > commit(customListOfPartitions)
> > > > > > 9. Forgot to include unsubscribe. Done now.
> > > > > > 10. positions() can be called at any time and affects the next
> > fetch
> > > on
> > > > > the
> > > > > > next poll(). Fixed the places that said "starting fetch offsets"
> > > > > > 11. Can we not look that up by going through the messages
> returned
> > > and
> > > > > > getting the offset from the ConsumerRecord?
> > > > > >
> > > > > > One thing that I really found helpful for the API design was
> > writing
> > > > out
> > > > > > actual code for different scenarios against the API. I think it
> > might
> > > > be
> > > > > > good to do that for this too--i.e. enumerate the various use
> cases
> > > and
> > > > > code
> > > > > > that use case up to see how it looks
> > > > > > The javadocs include examples for almost all possible scenarios
> of
> > > > > consumer
> > > > > > usage, that I could come up with. Will add more to the javadocs
> as
> > I
> > > > get
> > > > > > more feedback from our users. The advantage of having the
> examples
> > in
> > > > the
> > > > > > javadoc itself is to that the usage is self explanatory to new
> > users.
> > > > > >
> > > > > > Pradeep -
> > > > > >
> > > > > > 2. Changed to poll(long, TimeUnit) and a negative value for the
> > > timeout
> > > > > > would block in the poll forever until there is new data
> > > > > > 3. We don't have hierarchical topics support. Would you mind
> > > explaining
> > > > > > what you meant?
> > > > > > 4. I'm not so sure that we need a class to express a topic which
> > is a
> > > > > > string and a separate class for just partition id. We do have a
> > class
> > > > for
> > > > > > TopicPartition which uniquely identifies a partition of a topic
> > > > > >
> > > > > > Thanks,
> > > > > > Neha
> > > > > >
> > > > > >
> > > > > > On Mon, Feb 10, 2014 at 12:36 PM, Pradeep Gollakota <
> > > > > pradeep...@gmail.com
> > > > > > >wrote:
> > > > > >
> > > > > > > Couple of very quick thoughts.
> > > > > > >
> > > > > > > 1. +1 about renaming commit(...) and commitAsync(...)
> > > > > > > 2. I'd also like to extend the above for the poll()  method as
> > > well.
> > > > > > poll()
> > > > > > > and pollWithTimeout(long, TimeUnit)?
> > > > > > > 3. Have you guys given any thought around how this API would be
> > > used
> > > > > with
> > > > > > > hierarchical topics?
> > > > > > > 4. Would it make sense to add classes such as TopicId,
> > PartitionId,
> > > > > etc?
> > > > > > > Seems like it would be easier to read code with these classes
> as
> > > > > opposed
> > > > > > to
> > > > > > > string and longs.
> > > > > > >
> > > > > > > - Pradeep
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Feb 10, 2014 at 12:20 PM, Jay Kreps <
> jay.kr...@gmail.com
> > >
> > > > > wrote:
> > > > > > >
> > > > > > > > A few items:
> > > > > > > > 1. ConsumerRebalanceCallback
> > > > > > > >    a. onPartitionsRevoked would be a better name.
> > > > > > > >    b. We should discuss the possibility of splitting this
> into
> > > two
> > > > > > > > interfaces. The motivation would be that in Java 8 single
> > method
> > > > > > > interfaces
> > > > > > > > can directly take methods which might be more intuitive.
> > > > > > > >    c. If we stick with a single interface I would prefer the
> > name
> > > > > > > > RebalanceCallback as its more concise
> > > > > > > > 2. Should subscribe(String topic, int partition) should be
> > > > > > > subscribe(String
> > > > > > > > topic, int...partition)?
> > > > > > > > 3. Is lastCommittedOffset call just a local access? If so it
> > > would
> > > > be
> > > > > > > more
> > > > > > > > convenient not to batch it.
> > > > > > > > 4. How are we going to handle the earliest/latest starting
> > > position
> > > > > > > > functionality we currently have. Does that remain a config?
> > > > > > > > 5. Do we need to expose the general ability to get known
> > > positions
> > > > > from
> > > > > > > the
> > > > > > > > log? E.g. the functionality in the OffsetRequest...? That
> would
> > > > make
> > > > > > the
> > > > > > > > ability to change position a little easier.
> > > > > > > > 6. Should poll(java.lang.Long timeout) be poll(long timeout,
> > > > TimeUnit
> > > > > > > > unit)? Is it Long because it allows null? If so should we
> just
> > > add
> > > > a
> > > > > > > poll()
> > > > > > > > that polls indefinitely?
> > > > > > > > 7. I recommend we remove the boolean parameter from commit as
> > it
> > > is
> > > > > > > really
> > > > > > > > hard to read code that has boolean parameters without named
> > > > > arguments.
> > > > > > > Can
> > > > > > > > we make it something like commit(...) and commitAsync(...)?
> > > > > > > > 8. What about the common case where you just want to commit
> the
> > > > > current
> > > > > > > > position for all partitions?
> > > > > > > > 9. How do you unsubscribe?
> > > > > > > > 10. You say in a few places that positions() only impacts the
> > > > > starting
> > > > > > > > position, but surely that isn't the case, right? Surely it
> > > controls
> > > > > the
> > > > > > > > fetch position for that partition and can be called at any
> > time?
> > > > > > > Otherwise
> > > > > > > > it is a pretty weird api, right?
> > > > > > > > 11. How do I get my current position? Not the committed
> > position
> > > > but
> > > > > > the
> > > > > > > > offset of the next message that will be given to me?
> > > > > > > >
> > > > > > > > One thing that I really found helpful for the API design was
> > > > writing
> > > > > > out
> > > > > > > > actual code for different scenarios against the API. I think
> it
> > > > might
> > > > > > be
> > > > > > > > good to do that for this too--i.e. enumerate the various use
> > > cases
> > > > > and
> > > > > > > code
> > > > > > > > that use case up to see how it looks. I'm not sure if it
> would
> > be
> > > > > > useful
> > > > > > > to
> > > > > > > > collect these kinds of scenarios from people. I know they
> have
> > > > > > > sporadically
> > > > > > > > popped up on the mailing list.
> > > > > > > >
> > > > > > > > -Jay
> > > > > > > >
> > > > > > > >
> > > > > > > > On Mon, Feb 10, 2014 at 10:54 AM, Neha Narkhede <
> > > > > > neha.narkh...@gmail.com
> > > > > > > > >wrote:
> > > > > > > >
> > > > > > > > > As mentioned in previous emails, we are also working on a
> > > > > > > > re-implementation
> > > > > > > > > of the consumer. I would like to use this email thread to
> > > discuss
> > > > > the
> > > > > > > > > details of the public API. I would also like us to be picky
> > > about
> > > > > > this
> > > > > > > > > public api now so it is as good as possible and we don't
> need
> > > to
> > > > > > break
> > > > > > > it
> > > > > > > > > in the future.
> > > > > > > > >
> > > > > > > > > The best way to get a feel for the API is actually to take
> a
> > > look
> > > > > at
> > > > > > > the
> > > > > > > > > javadoc<
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
> > > > > > > > > >,
> > > > > > > > > the hope is to get the api docs good enough so that it is
> > > > > > > > self-explanatory.
> > > > > > > > > You can also take a look at the configs
> > > > > > > > > here<
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > > Some background info on implementation:
> > > > > > > > >
> > > > > > > > > At a high level the primary difference in this consumer is
> > that
> > > > it
> > > > > > > > removes
> > > > > > > > > the distinction between the "high-level" and "low-level"
> > > > consumer.
> > > > > > The
> > > > > > > > new
> > > > > > > > > consumer API is non blocking and instead of returning a
> > > blocking
> > > > > > > > iterator,
> > > > > > > > > the consumer provides a poll() API that returns a list of
> > > > records.
> > > > > We
> > > > > > > > think
> > > > > > > > > this is better compared to the blocking iterators since it
> > > > > > effectively
> > > > > > > > > decouples the threading strategy used for processing
> messages
> > > > from
> > > > > > the
> > > > > > > > > consumer. It is worth noting that the consumer is entirely
> > > single
> > > > > > > > threaded
> > > > > > > > > and runs in the user thread. The advantage is that it can
> be
> > > > easily
> > > > > > > > > rewritten in less multi-threading-friendly languages. The
> > > > consumer
> > > > > > > > batches
> > > > > > > > > data and multiplexes I/O over TCP connections to each of
> the
> > > > > brokers
> > > > > > it
> > > > > > > > > communicates with, for high throughput. The consumer also
> > > allows
> > > > > long
> > > > > > > > poll
> > > > > > > > > to reduce the end-to-end message latency for low throughput
> > > data.
> > > > > > > > >
> > > > > > > > > The consumer provides a group management facility that
> > supports
> > > > the
> > > > > > > > concept
> > > > > > > > > of a group with multiple consumer instances (just like the
> > > > current
> > > > > > > > > consumer). This is done through a custom heartbeat and
> group
> > > > > > management
> > > > > > > > > protocol transparent to the user. At the same time, it
> allows
> > > > users
> > > > > > the
> > > > > > > > > option to subscribe to a fixed set of partitions and not
> use
> > > > group
> > > > > > > > > management at all. The offset management strategy defaults
> to
> > > > Kafka
> > > > > > > based
> > > > > > > > > offset management and the API provides a way for the user
> to
> > > use
> > > > a
> > > > > > > > > customized offset store to manage the consumer's offsets.
> > > > > > > > >
> > > > > > > > > A key difference in this consumer also is the fact that it
> > does
> > > > not
> > > > > > > > depend
> > > > > > > > > on zookeeper at all.
> > > > > > > > >
> > > > > > > > > More details about the new consumer design are
> > > > > > > > > here<
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > > Please take a look at the new
> > > > > > > > > API<
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
> > > > > > > > > >and
> > > > > > > > > give us any thoughts you may have.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Neha
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>



-- 
-- Guozhang

Reply via email to