----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/19731/#review40262 -----------------------------------------------------------
Some general comments: 1. Do we need to take parameters for position and committed? Shall we just let these two functions return offsets of the owned partitions, and users could just try to access the partitions of interest? 2. The usage of seek in the try-catch example seems a little awkward to me: do we really need to re-set the fetching position in processing failures? If people are afraid that this would be the case, should they be processing message one-at-a-time, like: for (record in records) { try { process(record) } catch { // resume and re-process(record)... } } clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java <https://reviews.apache.org/r/19731/#comment73213> I think if some TopicPartition is not currently owned by the consumer, an exception will be thrown? If yes shall we state that in the comments? clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java <https://reviews.apache.org/r/19731/#comment73214> Same as above? clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java <https://reviews.apache.org/r/19731/#comment73215> Same as above? clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java <https://reviews.apache.org/r/19731/#comment73211> In the examples this config string is still "auto.commit.enable" clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java <https://reviews.apache.org/r/19731/#comment73216> I am not sure how we can do strict memory management at the consumer, since: 1) we do not have sth. like a fetch.max.bytes to upper-bound the returned response for fetch requests. So we can theoretically get a response that is as large as socket.receive.buffer.bytes; 2) when we read from the socket for responses, we have to read it as a whole and put it into a single MemoryRecords object. Only when we will know the size of the response. clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java <https://reviews.apache.org/r/19731/#comment73427> There is a discrepancy in "partition": in the constructor parameters it is the int partition id, in the getter function it refers to the TopicPartition. Perhaps we can rename the constructor parameters to "partitionId", and add a getter function "public int partitionId()". clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java <https://reviews.apache.org/r/19731/#comment73433> When need to change the examples with the new signature of poll/commit/etc once we have finalized the API. clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java <https://reviews.apache.org/r/19731/#comment73434> Rebase? clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java <https://reviews.apache.org/r/19731/#comment73435> Ditto as above - Guozhang Wang On April 13, 2014, 2:12 a.m., Neha Narkhede wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/19731/ > ----------------------------------------------------------- > > (Updated April 13, 2014, 2:12 a.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1328 > https://issues.apache.org/jira/browse/KAFKA-1328 > > > Repository: kafka > > > Description > ------- > > Fixed the javadoc usage examples in KafkaConsumer to match the API changes > > > Changed the signature of poll to return Map<String,ConsumerRecordMetadata> to > organize the ConsumerRecords around topic and then optionally around > partition. This will serve the group management as well as custom partition > subscription use cases > > > 1. Changed the signature of poll() to return Map<String, > List<ConsumerRecord>> 2. Changed ConsumerRecord to throw an exception if an > error is detected for the partition. For example, if a single large message > is larger than the total memory just for that partition, we don't want poll() > to throw an exception since that will affect the processing of the remaining > partitions as well > > > Fixed MockConsumer to make subscribe(topics) and subscribe(partitions) > mutually exclusive > > > Changed the package to org.apache.kafka.clients.consumer from > kafka.clients.consumer > > > Changed the package to org.apache.kafka.clients.consumer from > kafka.clients.consumer > > > 1. Removed the commitAsync() APIs 2. Changed the commit() APIs to return a > Future > > > Fixed configs to match the producer side configs for metrics > > > Renamed AUTO_COMMIT_ENABLE_CONFIG to ENABLE_AUTO_COMMIT_CONFIG > > > Addressing review comments from Tim and Guozhang > > > Rebasing after producer side config cleanup > > > Added license headers > > > Cleaned javadoc for ConsumerConfig > > > Fixed minor indentation in ConsumerConfig > > > Improve docs on ConsumerConfig > > > 1. Added ClientUtils 2. Added basic constructor implementation for > KafkaConsumer > > > Improved MockConsumer > > > Chris's feedback and also consumer rewind example code > > > Added commit() and commitAsync() APIs to the consumer and updated docs and > examples to reflect that > > > 1. Added consumer usage examples to javadoc 2. Changed signature of APIs that > accept or return offsets from list of offsets to map of offsets > > > Improved example for using ConsumerRebalanceCallback > > > Improved example for using ConsumerRebalanceCallback > > > Included Jun's review comments and renamed positions to seek. Also included > position() > > > Changes to javadoc for positions() > > > Changed the javadoc for ConsumerRebalanceCallback > > > Changing unsubscribe to also take in var args for topic list > > > Incorporated first round of feedback from Jay, Pradeep and Mattijs on the > mailing list > > > Updated configs > > > Javadoc for consumer complete > > > Completed docs for Consumer and ConsumerRebalanceCallback. Added MockConsumer > > > Added the initial interfaces and related documentation for the consumer. More > docs required to complete the public API > > > Diffs > ----- > > clients/src/main/java/kafka/common/TopicPartitionOffset.java PRE-CREATION > clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/FutureOffsetMetadata.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java > a6423f4b37a57f0290e2048b764de1218470f4f7 > clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java > PRE-CREATION > > clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java > PRE-CREATION > > Diff: https://reviews.apache.org/r/19731/diff/ > > > Testing > ------- > > > Thanks, > > Neha Narkhede > >