I took some time to write some example code using the new consumer APIs to
cover a range of use cases. This exercise was very useful (thanks for the
suggestion, Jay!) since I found several improvements to the APIs to make
them more usable. Here are some of the
changes<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/>I
made -

1. Added usage examples to the KafkaConsumer
javadoc<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html>.
I find it useful for the examples to be in the javadoc vs some wiki. Please
go through these examples and suggest improvements. The goal would be to
document a limited set of examples that cover every major use case.
2. All APIs that either accept or return offsets are changed to
Map<TopicPartition,Long> instead of TopicPartitionOffset... In all the
examples that I wrote, it was much easier to deal with offsets and pass
them around in the consumer APIs if they were maps instead of lists
3. Due to the above change, I had to introduce
commit()<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html#commit%28%29>and
commitAsync() APIs explicitly, in addition to
commit(Map<TopicPartition,Long> offsets) and
commitAsync(Map<TopicPartition,Long> offsets), since the no-argument case
would not be covered automatically with Map as the input parameter to the
commit APIs
4. Offset rewind logic is funky with group management. I took a stab and it
and wrote examples to cover the various offset rewind uses cases I could
think of. I'm not so sure I like it, so I encourage people to take a look
at the examples and provide feedback. This feedback is very critical in
finalizing the consumer APIs as we might have to add/change APIs to make
offset rewind intuitive and easy to use. (Please see the 3rd and 4th
examples 
here<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html>
)

Once I have feedback on the above, I will go ahead and submit a review
board for the new APIs and javadoc.

Thanks
Neha


On Mon, Mar 24, 2014 at 5:29 PM, Neha Narkhede <neha.narkh...@gmail.com>wrote:

> Hey Chris,
>
> Really sorry for the late reply, wonder how this fell through the cracks.
> Anyhow, thanks for the great feedback! Here are my comments -
>
>
> 1. Why is the config String->Object instead of String->String?
>
> This is probably more of a feedback about the new config management that
> we adopted in the new clients. I think it is more convenient to write
> configs.put("a", 42);
> instead of
> configs.put("a", Integer.toString(42));
>
> 2. Are these Java docs correct?
>
>   KafkaConsumer(java.util.Map<
> java.lang.String,java.lang.Object> configs)
>   A consumer is instantiated by providing a set of key-value pairs as
> configuration and a ConsumerRebalanceCallback implementation
>
> There is no ConsumerRebalanceCallback parameter.
>
> Fixed.
>
>
> 3. Would like to have a method:
>
>   poll(long timeout, java.util.concurrent.TimeUnit timeUnit,
> TopicPartition... topicAndPartitionsToPoll)
>
> I see I can effectively do this by just fiddling with subscribe and
> unsubscribe before each poll. Is this a low-overhead operation? Can I just
> unsubscribe from everything after each poll, then re-subscribe to a topic
> the next iteration. I would probably be doing this in a fairly tight loop.
>
> The subscribe and unsubscribe will be very lightweight in-memory
> operations,
> so it shouldn't be a problem to just use those APIs directly.
> Let me know if you think otherwise.
>
> 4. The behavior of AUTO_OFFSET_RESET_CONFIG is overloaded. I think there
> are use cases for decoupling "what to do when no offset exists" from "what
> to do when I'm out of range". I might want to start from smallest the
> first time I run, but fail if I ever get offset out of range.
>
> How about adding a third option "disable" to "auto.offset.reset"?
> What this says is that never automatically reset the offset, either if one
> is not found or if the offset
> falls out of range. Presumably, you would want to turn this off when you
> want to control the offsets
> yourself and use custom rewind/replay logic to reset the consumer's
> offset. In this case, you would
> want to turn this feature off so Kafka does not accidentally reset the
> offset to something else.
>
> I'm not so sure when you would want to make the distinction regarding
> startup and offset falling out
> of range. Presumably, if you don't trust Kafka to reset the offset, then
> you can always turn this off
> and use commit/commitAsync and seek() to set the consumer to the right
> offset on startup and every
> time your consumer falls out of range.
>
> Does that make sense?
>
> 5. ENABLE_JMX could use Java docs, even though it's fairly
> self-explanatory.
>
> Fixed.
>
> 6. Clarity about whether FETCH_BUFFER_CONFIG is per-topic/partition, or
> across all topic/partitions is useful. I believe it's per-topic/partition,
> right? That is, setting to 2 megs with two TopicAndPartitions would result
> in 4 megs worth of data coming in per fetch, right?
>
> Good point, clarified that. Take a look again to see if it makes sense now.
>
> 7. What does the consumer do if METADATA_FETCH_TIMEOUT_CONFIG times out?
> Retry, or throw exception?
>
> Throw a TimeoutException. Clarified that in the 
> docs<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html>
> .
>
>
> 8. Does RECONNECT_BACKOFF_MS_CONFIG apply to both metadata requests and
> fetch requests?
>
> Applies to all requests. Clarified that in the docs.
>
> 9. What does SESSION_TIMEOUT_MS default to?
>
> Defaults are largely TODO, but session.timeout.ms currently defaults to
> 1000.
>
> 10. Is this consumer thread-safe?
>
> It should be. Updated the 
> docs<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html>to
>  clarify that.
>
> 11. How do you use a different offset management strategy? Your email
> implies that it's pluggable, but I don't see how. "The offset management
> strategy defaults to Kafka based offset management and the API provides a
> way for the user to use a customized offset store to manage the consumer's
> offsets."
>
> 12. If I wish to decouple the consumer from the offset checkpointing, is
> it OK to use Joel's offset management stuff directly, rather than through
> the consumer's commit API?
>
> For #11 and #12, I updated the 
> docs<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html>to
>  include actual usage examples.
> Could you take a look and see if answers your questions?
>
> Thanks,
> Neha
>
>
>
> On Mon, Mar 3, 2014 at 10:28 AM, Chris Riccomini 
> <criccom...@linkedin.com>wrote:
>
>> Hey Guys,
>>
>> Also, for reference, we'll be looking to implement new Samza consumers
>> which have these APIs:
>>
>>
>> http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/or
>> g/apache/samza/system/SystemConsumer.html<http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/system/SystemConsumer.html>
>>
>>
>> http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/or
>> g/apache/samza/checkpoint/CheckpointManager.html<http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/checkpoint/CheckpointManager.html>
>>
>>
>> Question (3) below is a result of having Samza's SystemConsumers poll
>> allow specific topic/partitions to be specified.
>>
>> The split between consumer and checkpoint manager is the reason for
>> question (12) below.
>>
>> Cheers,
>> Chris
>>
>> On 3/3/14 10:19 AM, "Chris Riccomini" <criccom...@linkedin.com> wrote:
>>
>> >Hey Guys,
>> >
>> >Sorry for the late follow up. Here are my questions/thoughts on the API:
>> >
>> >1. Why is the config String->Object instead of String->String?
>> >
>> >2. Are these Java docs correct?
>> >
>> >  KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)
>> >  A consumer is instantiated by providing a set of key-value pairs as
>> >configuration and a ConsumerRebalanceCallback implementation
>> >
>> >There is no ConsumerRebalanceCallback parameter.
>> >
>> >3. Would like to have a method:
>> >
>> >  poll(long timeout, java.util.concurrent.TimeUnit timeUnit,
>> >TopicPartition... topicAndPartitionsToPoll)
>> >
>> >I see I can effectively do this by just fiddling with subscribe and
>> >unsubscribe before each poll. Is this a low-overhead operation? Can I
>> just
>> >unsubscribe from everything after each poll, then re-subscribe to a topic
>> >the next iteration. I would probably be doing this in a fairly tight
>> loop.
>> >
>> >4. The behavior of AUTO_OFFSET_RESET_CONFIG is overloaded. I think there
>> >are use cases for decoupling "what to do when no offset exists" from
>> "what
>> >to do when I'm out of range". I might want to start from smallest the
>> >first time I run, but fail if I ever get offset out of range.
>> >
>> >5. ENABLE_JMX could use Java docs, even though it's fairly
>> >self-explanatory.
>> >
>> >6. Clarity about whether FETCH_BUFFER_CONFIG is per-topic/partition, or
>> >across all topic/partitions is useful. I believe it's
>> per-topic/partition,
>> >right? That is, setting to 2 megs with two TopicAndPartitions would
>> result
>> >in 4 megs worth of data coming in per fetch, right?
>> >
>> >7. What does the consumer do if METADATA_FETCH_TIMEOUT_CONFIG times out?
>> >Retry, or throw exception?
>> >
>> >8. Does RECONNECT_BACKOFF_MS_CONFIG apply to both metadata requests and
>> >fetch requests?
>> >
>> >9. What does SESSION_TIMEOUT_MS default to?
>> >
>> >10. Is this consumer thread-safe?
>> >
>> >11. How do you use a different offset management strategy? Your email
>> >implies that it's pluggable, but I don't see how. "The offset management
>> >strategy defaults to Kafka based offset management and the API provides a
>> >way for the user to use a customized offset store to manage the
>> consumer's
>> >offsets."
>> >
>> >12. If I wish to decouple the consumer from the offset checkpointing, is
>> >it OK to use Joel's offset management stuff directly, rather than through
>> >the consumer's commit API?
>> >
>> >
>> >Cheers,
>> >Chris
>> >
>> >On 2/10/14 10:54 AM, "Neha Narkhede" <neha.narkh...@gmail.com> wrote:
>> >
>> >>As mentioned in previous emails, we are also working on a
>> >>re-implementation
>> >>of the consumer. I would like to use this email thread to discuss the
>> >>details of the public API. I would also like us to be picky about this
>> >>public api now so it is as good as possible and we don't need to break
>> it
>> >>in the future.
>> >>
>> >>The best way to get a feel for the API is actually to take a look at the
>> >>javadoc<
>> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc
>> >>/
>> >>doc/kafka/clients/consumer/KafkaConsumer.html>,
>> >>the hope is to get the api docs good enough so that it is
>> >>self-explanatory.
>> >>You can also take a look at the configs
>> >>here<
>> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/do
>> >>c
>> >>/kafka/clients/consumer/ConsumerConfig.html>
>> >>
>> >>Some background info on implementation:
>> >>
>> >>At a high level the primary difference in this consumer is that it
>> >>removes
>> >>the distinction between the "high-level" and "low-level" consumer. The
>> >>new
>> >>consumer API is non blocking and instead of returning a blocking
>> >>iterator,
>> >>the consumer provides a poll() API that returns a list of records. We
>> >>think
>> >>this is better compared to the blocking iterators since it effectively
>> >>decouples the threading strategy used for processing messages from the
>> >>consumer. It is worth noting that the consumer is entirely single
>> >>threaded
>> >>and runs in the user thread. The advantage is that it can be easily
>> >>rewritten in less multi-threading-friendly languages. The consumer
>> >>batches
>> >>data and multiplexes I/O over TCP connections to each of the brokers it
>> >>communicates with, for high throughput. The consumer also allows long
>> >>poll
>> >>to reduce the end-to-end message latency for low throughput data.
>> >>
>> >>The consumer provides a group management facility that supports the
>> >>concept
>> >>of a group with multiple consumer instances (just like the current
>> >>consumer). This is done through a custom heartbeat and group management
>> >>protocol transparent to the user. At the same time, it allows users the
>> >>option to subscribe to a fixed set of partitions and not use group
>> >>management at all. The offset management strategy defaults to Kafka
>> based
>> >>offset management and the API provides a way for the user to use a
>> >>customized offset store to manage the consumer's offsets.
>> >>
>> >>A key difference in this consumer also is the fact that it does not
>> >>depend
>> >>on zookeeper at all.
>> >>
>> >>More details about the new consumer design are
>> >>here<
>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer
>> >>+
>> >>Rewrite+Design>
>> >>
>> >>Please take a look at the new
>> >>API<
>> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc
>> >>/
>> >>kafka/clients/consumer/KafkaConsumer.html>and
>> >>give us any thoughts you may have.
>> >>
>> >>Thanks,
>> >>Neha
>> >
>>
>>
>

Reply via email to