----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review65874 -----------------------------------------------------------
Looks great. Just a few more minor comments + a question that you may have missed from the previous round. core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala <https://reviews.apache.org/r/25995/#comment109127> I now remember what the reasoning behind this was. We originally decided that during rebalances, offset commits _need to_ succeed to reduce duplicates. i.e., retry indefinitely if there are failures in offset commits while rebalancing. We did not want it to hold up shutdown though. This is why we reduced retriesRemaining only if not shutting down. However, in retrospect I think this change is better. i.e., retry always up to retry count. If a user wishes to reduce the probability of duplicates just bump up offset commit retry count. Do you agree? core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment109156> Use valuesIterator instead of values - the reason is that values materializes, but the iterator will not; map over the iterator will give another iterator. So I'm pretty sure with that sum is computed without materializing an entire collection of sizes. core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment109129> As mentioned in the previous RB: do we need this given that it should be almost equivalent to the producer's dropped messages metric? core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment109130> Configure core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment109133> Repeating question from last round: If block on buffer exhaustion is not turned on, do we still want to shut down the mirror maker? i.e., if the user really wants zero data loss they would set that to true right? If it is set to false and the MM exits what purpose does it serve? core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment109136> offsets core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment109135> Sorry I wasn't clear on this. I meant we should probably catch all throwables and only exit if OOME. core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment109137> Capital U for %s to %d (generally limit to alphanumeric symbols comma colon and dash for easier greps) core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment109138> info("Ignoring interrupt while waiting.") core/src/main/scala/kafka/utils/DoublyLinkedList.scala <https://reviews.apache.org/r/25995/#comment109143> This should ideally be nested static class of DoublyLinkedList and named Node core/src/main/scala/kafka/utils/DoublyLinkedList.scala <https://reviews.apache.org/r/25995/#comment109165> Add @threadsafe annotation above core/src/main/scala/kafka/utils/DoublyLinkedList.scala <https://reviews.apache.org/r/25995/#comment109162> private core/src/main/scala/kafka/utils/DoublyLinkedList.scala <https://reviews.apache.org/r/25995/#comment109163> private core/src/main/scala/kafka/utils/DoublyLinkedList.scala <https://reviews.apache.org/r/25995/#comment109164> private core/src/test/scala/unit/kafka/utils/DoublyLinkedListTest.scala <https://reviews.apache.org/r/25995/#comment109139> Can you move this to UtilsTest? core/src/test/scala/unit/kafka/utils/DoublyLinkedListTest.scala <https://reviews.apache.org/r/25995/#comment109147> Should probably also do a removal from a single-entry list. - Joel Koshy On Dec. 23, 2014, 3:07 a.m., Jiangjie Qin wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/25995/ > ----------------------------------------------------------- > > (Updated Dec. 23, 2014, 3:07 a.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1650 and KAKFA-1650 > https://issues.apache.org/jira/browse/KAFKA-1650 > https://issues.apache.org/jira/browse/KAKFA-1650 > > > Repository: kafka > > > Description > ------- > > Addressed Guozhang's comments. > > > Addressed Guozhang's comments > > > commit before switch to trunk > > > commit before rebase > > > Rebased on trunk, Addressed Guozhang's comments. > > > Addressed Guozhang's comments on MaxInFlightRequests > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > > Incorporated Guozhang's comments > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > > Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. > > > Added consumer rebalance listener to mirror maker, will test it later. > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > Conflicts: > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala > > core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala > > added custom config for consumer rebalance listener > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > > Add configurable consumer rebalance listener > > > Incorporated Guozhang's comments > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > > Incorporated Guozhang's comments. > > > Addressed Guozhang's comment. > > > numMessageUnacked should be decremented no matter the send was successful or > not. > > > Addressed Jun's comments. > > > Incorporated Jun's comments > > > Incorporated Jun's comments and rebased on trunk > > > Rebased on current trunk > > > Addressed Joel's comments. > > > Addressed Joel's comments. > > > Incorporated Joel's comments > > > Incorporated Joel's comments > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > > Incorporated Joel's comments > > > Fix a bug in metric. > > > Missed some change in the prvevious patch submission, submit patch again. > > > change offset commit thread to use scheduler. > > > Addressed Joel's comments. > > > Diffs > ----- > > core/src/main/scala/kafka/consumer/ConsumerConnector.scala > 62c0686e816d2888772d5a911becf625eedee397 > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala > e991d2187d03241f639eeaf6769fb59c8c99664c > core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala > 9baad34a9793e5067d11289ece2154ba87b388af > core/src/main/scala/kafka/tools/MirrorMaker.scala > 53cb16c2949e0ac36a0e943564fc9fc9b4c84caa > core/src/main/scala/kafka/utils/DoublyLinkedList.scala PRE-CREATION > core/src/test/scala/unit/kafka/utils/DoublyLinkedListTest.scala > PRE-CREATION > > Diff: https://reviews.apache.org/r/25995/diff/ > > > Testing > ------- > > > Thanks, > > Jiangjie Qin > >