----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review55612 -----------------------------------------------------------
core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala <https://reviews.apache.org/r/25995/#comment95974> Do we need to add "=" here? core/src/main/scala/kafka/server/ReplicaManager.scala <https://reviews.apache.org/r/25995/#comment95975> We should keep the changes of KAFKA-1647 in its only RB and do not merge them here. core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment95978> Could we add some introduction comment here on: 1. The architecture of the MM: producer / consumer thread, data channel per producer thread, offset commit thread, and how different modules interact with each other. 2. Why we need a separate offset commit thread, and how it works. 3. The startup / shutdown process, like which modules to start / shutdown first (this could be moved to the head of the corresponding functions also). core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment95979> "Embedded consumer config for consuming from the source cluster." core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment95980> "Embedded producer config for producing to the target cluster." core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment95981> "The offset commit thread periodically commit consumed offsets to the source cluster. With the new producer, the offsets are updated upon the returned future metadata of the send() call; with the old producer, the offsets are updated upon the consumer's iterator advances. By doing this, it is guaranteed no data loss even when mirror maker is uncleanly shutdown with the new producer, while with the old producer messages inside the data channel could be lost upon mirror maker unclean shutdown." core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment96019> "numMessageCapacity" and "byteCapacity"? "numGetters" and "numPutters" (since the producer is the consumer of this buffer and vice versa)? core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment96021> How about "MirrorMaker-DataChannel-queue%d-NumMessages" and "MirrorMaker-DataChannel-queue%d-Bytes"? and variable name "channelNumMessageHists" and "channelByteHists"? core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment96020> Can we define put(record, queueId) and put(record), and the latter includes the logic of determining the queueId and then call the former? core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment96022> comment on why we use the hashCode of source topic / partition here. core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment96026> Instead of letting the consumer to check on the global shutdown flag, could we just add a shutdown function which sets it own flag like the producer thread and the commit thread? Then the process of the shutdown becomes consumers.shutdown consumers.awaitShutdown producers.shutdown producers.awaitShutdown committer.shutdown committer.awaitShutdown connector.shutdown core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment96023> Maybe just "// if it exits accidentally, stop the entire mirror maker" as we did below? core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment96024> // if it exits accidentally, stop the entire mirror maker core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment96025> // the committed offset will be the first offset of the un-consumed message, hence we need to increment by one. core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala <https://reviews.apache.org/r/25995/#comment96027> "queueNumItemCapacity" and "queueByteCapacity"? - Guozhang Wang On Oct. 6, 2014, 5:20 p.m., Jiangjie Qin wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/25995/ > ----------------------------------------------------------- > > (Updated Oct. 6, 2014, 5:20 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1650 > https://issues.apache.org/jira/browse/KAFKA-1650 > > > Repository: kafka > > > Description > ------- > > Addressed Guozhang's comments. > > Talked with Joel and decided to remove multi connector support as people can > always creat multiple mirror maker instances if they want to consumer from > multiple clusters. > > > Diffs > ----- > > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala > fbc680fde21b02f11285a4f4b442987356abd17b > core/src/main/scala/kafka/server/ReplicaManager.scala > 78b7514cc109547c562e635824684fad581af653 > core/src/main/scala/kafka/tools/MirrorMaker.scala > b8698ee1469c8fbc92ccc176d916eb3e28b87867 > core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala PRE-CREATION > > Diff: https://reviews.apache.org/r/25995/diff/ > > > Testing > ------- > > > Thanks, > > Jiangjie Qin > >