----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review54620 -----------------------------------------------------------
core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala <https://reviews.apache.org/r/25995/#comment94830> Do we need to do this check every time in the loop? core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment94831> no need empty line here. core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment94832> No need bracket core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment94833> No need bracket core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment94835> Maximum bytes that can be buffered in the data channels core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment94834> in terms of bytes core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment94836> Inconsistency indentation. core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment94838> Capitalize: Offset commit interval in ms core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment94841> Do you need to turn off auto commit on the consumer threads here? core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment94840> We can add some more comment here, explaning: 1) why we add the offset commit thread for new producer, but not old producer; 2) what risks does the old producer have (for not having offset commit thread). core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment94842> For clean shutdown, you need to 1) "halt" consumer threads first. 2) wait for producer to drain all the messages in data channel. 3) manually commit offsets on consumer threads. 4) shut down consumer threads. Otherwise we will have data duplicates as we commit offsets based on min. core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment94844> queueId core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment94846> How about having a histogram for each queue instead of getting the sum? The update call would be a bit less expensive and we can monitor if some queues are empty while others get all the data. core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment94847> Ditto above. core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment94849> Add comments explaining why we force an unclean shutdown with System.exit here. core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment94857> Unfortunately this may not be the case, as we can have multiple connectors which are using different consumer configs with different group ids. We need to either 1) change the config settings to enforce this to be true, or 2) use a separate offset client that remembers which topics belongs to which groups. core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment94858> Capitalize first word core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment94859> Capitalize first word core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment94863> Adding comment to the logic of how this works. Also a few questions: 1) is the map() call synchronized with other threads putting new offsets into the map? 2) after the sorting, the logic may be clearer as val commitableOffsetIndex = 0 while (offsets[commitableOffsetIndex] - offsets.head == commitableOffsetIndex) commitableOffsetIndex += 1 offsetToCommit = offsets[commitableOffsetIndex] + 1 core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment94855> The send().get() call is missing. core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala <https://reviews.apache.org/r/25995/#comment94853> Apache header missing. - Guozhang Wang On Sept. 24, 2014, 4:26 p.m., Jiangjie Qin wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/25995/ > ----------------------------------------------------------- > > (Updated Sept. 24, 2014, 4:26 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1650 > https://issues.apache.org/jira/browse/KAFKA-1650 > > > Repository: kafka > > > Description > ------- > > mirror maker redesign; adding byte bounded blocking queue. > > > Diffs > ----- > > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala > fbc680fde21b02f11285a4f4b442987356abd17b > core/src/main/scala/kafka/tools/MirrorMaker.scala > b8698ee1469c8fbc92ccc176d916eb3e28b87867 > core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala PRE-CREATION > > Diff: https://reviews.apache.org/r/25995/diff/ > > > Testing > ------- > > > Thanks, > > Jiangjie Qin > >