----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25944/#review54329 -----------------------------------------------------------
core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala <https://reviews.apache.org/r/25944/#comment94448> Revert this whitespace change. core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala <https://reviews.apache.org/r/25944/#comment94568> revert core/src/main/scala/kafka/tools/ConfigConstants.scala <https://reviews.apache.org/r/25944/#comment94456> I think we can remove this file and just pick up the defaults from object ConsumerConfig core/src/main/scala/kafka/tools/OffsetClient.scala <https://reviews.apache.org/r/25944/#comment94453> I think kafka.client would be a better place for this than kafka.tools In fact, we should consider rewriting this in Java and putting it in the clients package, but since this is largely based on the existing code we can defer on that. core/src/main/scala/kafka/tools/OffsetClient.scala <https://reviews.apache.org/r/25944/#comment94567> License header core/src/main/scala/kafka/tools/OffsetClient.scala <https://reviews.apache.org/r/25944/#comment94449> Can you remove the auto-generated comments and add scaladocs? core/src/main/scala/kafka/tools/OffsetClient.scala <https://reviews.apache.org/r/25944/#comment94454> val socketTimeoutMs = if (config != null) config.socketTimeoutMs else ConsumerConfig.default... core/src/main/scala/kafka/tools/OffsetClient.scala <https://reviews.apache.org/r/25944/#comment94459> Should this be allowed to loop indefinitely until a connection is established? i.e., remove i < shuffledBrokers.size and change line 38 to be i = (i + 1) % shuffledBrokers.size If no connection is established then we would hit an NPE on line 53 core/src/main/scala/kafka/tools/OffsetClient.scala <https://reviews.apache.org/r/25944/#comment94460> Connecting to broker... core/src/main/scala/kafka/tools/OffsetClient.scala <https://reviews.apache.org/r/25944/#comment94499> Can you also remove the following: ClientUtils.scala: channelToOffsetManager SimpleConsumer.scala: commitOffsets and fetchOffsets You will need to update ConsumerOffsetChecker to use the OffsetClient as well core/src/main/scala/kafka/tools/OffsetClient.scala <https://reviews.apache.org/r/25944/#comment94455> val (offsetChannelBackoffMs, socketTimeoutMs) = if (config != null) (config.offsetsChannelBackoffMs, config.socketTimeoutMs) else ... (also, the convention elsewhere in the code is to write Ms not MS) core/src/main/scala/kafka/tools/OffsetClient.scala <https://reviews.apache.org/r/25944/#comment94484> This would be a good time to rename all these references to coordinator to offsetManager core/src/main/scala/kafka/tools/OffsetClient.scala <https://reviews.apache.org/r/25944/#comment94481> Can you add a info saying "Connected to offset manager host:port for group ..." core/src/main/scala/kafka/tools/OffsetClient.scala <https://reviews.apache.org/r/25944/#comment94557> Should we just use ClientUtils.fetchTopicMetadata? core/src/main/scala/kafka/tools/OffsetClient.scala <https://reviews.apache.org/r/25944/#comment94501> Should probably back-off on retries core/src/main/scala/kafka/tools/OffsetClient.scala <https://reviews.apache.org/r/25944/#comment94554> Better to name this fetchOffsets Also, it seems annoying for a user to always have to pass in a consumer config. Should we just expose the timeouts, etc through this API and provide defaults as well? Alternatively we could provide an OffsetClient class that takes these parameters once through its constructor. One benefit of doing that is it becomes possible to maintain a persistent offset manager channel - with this for every ofset fetch/commit the consumer (or tools) have to spawn connections to query for metadata then possibly topic metadata and then the offset fetch or commit. Also, consider making a separate OffsetClientConfig class. Also, see comment below for compatibility with Java. Not sure if the response can be easily extracted from the option in Java code core/src/main/scala/kafka/tools/OffsetClient.scala <https://reviews.apache.org/r/25944/#comment94560> flatMap { case(topic, partitionsMetadata) => ... } core/src/main/scala/kafka/tools/OffsetClient.scala <https://reviews.apache.org/r/25944/#comment94559> You can remove "val topicPartitions =" core/src/main/scala/kafka/tools/OffsetClient.scala <https://reviews.apache.org/r/25944/#comment94452> Instead of taking a map of offsetsToCommit it may be better to just take an OffsetCommitRequest. That will make it easier to use this client from Java. You will also need to provide a variant of this method that takes javaapi.OffsetCommitRequest Also, better to call this commitOffsets Also, see comment above on config - Joel Koshy On Sept. 23, 2014, 5:48 p.m., Mayuresh Gharat wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/25944/ > ----------------------------------------------------------- > > (Updated Sept. 23, 2014, 5:48 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1013 > https://issues.apache.org/jira/browse/KAFKA-1013 > > > Repository: kafka > > > Description > ------- > > OffsetCLient Tool API. ImportZkOffsets and ExportZkOffsets replaced by > ImportOffsets and ExportOffsets > > > Modified the comments in the headers > > > Corrected a value > > > Diffs > ----- > > config/consumer.properties 83847de30d10b6e78bb8de28e0bb925d7c0e6ca2 > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala > fbc680fde21b02f11285a4f4b442987356abd17b > core/src/main/scala/kafka/tools/ConfigConstants.scala PRE-CREATION > core/src/main/scala/kafka/tools/ExportOffsets.scala PRE-CREATION > core/src/main/scala/kafka/tools/ImportOffsets.scala PRE-CREATION > core/src/main/scala/kafka/tools/OffsetClient.scala PRE-CREATION > > Diff: https://reviews.apache.org/r/25944/diff/ > > > Testing > ------- > > > Thanks, > > Mayuresh Gharat > >