----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review65306 -----------------------------------------------------------
core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala <https://reviews.apache.org/r/25995/#comment108396> Can we make this a "reliable" commit - i.e., with retries up to the configured retry count? The policy is retry on commit errors during rebalance or shutdown, no need to retry on commit errors during auto-commits. So for e.g., if a mirror maker rebalances and there is simultaneously offset manager movement we would need to retry the commit. This is the motivation for the isAutoCommit flag - however, there seems to be a bug right now which maybe you can fix. i.e., if this is not an auto-commit then set retries to configured retries else no retries. core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment108400> Let us inline the valueFactory. i.e., `new Pool[TopicAndPartition, OffsetList](valueFactory = Some((k: TopicAndPartition) => new OffsetList))` core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment108417> numUnackedMessages core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment108421> Can you update the comment? I don't see any "rebalance latch" core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment108410> This is a slightly misleading comment. i.e., we do allow a single message that may exceed this right? It is hard to document succinctly though. core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment108403> Why do you need a dummy param? core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment108404> This is a matter of taste, but personally I prefer using Options in these scenarios. i.e., instead of null use Null. It is less error prone when writing code since you cannot invoke directly without doing a get first so it forces you to remember to check whether it is defined or not. core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment108405> We should probably just say "periodically commits consumed offsets." since it could commit to zookeeper as well. core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment108406> We should probably just say "periodically commits consumed offsets." since it could commit to zookeeper as well. core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment108409> You _must_ also override the auto.commit.enable to false in the consumer config. The default is true. core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment108407> isCleanShutdown is not defined core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment108408> should only be.. core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment108411> private core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment108414> Do we need this counter? i.e., there is already a commit meter in the consumer connector. core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment108412> Why not use KafkaScheduler for the offset commit task? core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment108413> commitOffsets core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment108415> For clarity and consistency (with MMRecord) can we call it sourceTopicPartition; sourceOffset? core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment108423> sourceTopicPartition... core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment108416> // synchronize to ensure that addOffset precedes removeOffset core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment108419> There might be a small memory leak here: if there was an error though (in the branch above) it seems no one removes the offset from the list. core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment108420> one message (not 1 message). Rule in writing: 0-9 should be spelled, >= 10 can be numeric. core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment108422> Shouldn't this be further up? Specifically, in the synchronized block after the while loop. Also, this is not named very suitably since the rebalance is still in progress even at this point. Something like rebalanceBlocked (or some better name) maybe? core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment108418> How about naming this something like PendingOffset or even UnackedOffset? OffsetListNode looks a bit odd when it is actually used above. core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment108426> Note earlier comment on null vs None core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment108424> UnackedOffsets or UnackedOffsetList core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment108429> Should we expose a size() method - i.e., increment on add and decrement on remove. We can aggregate the size of all the offset lists outside and emit a gauge. That will give us some assurance that there are no "forgotten" offsets. Re: the potential leak mentioned above. In fact, I'm a bit nervous about correctness since this is a custom implementation of a semi-non-trivial data structure. We should probably even assert that it is empty when numMessageUnacked goes to zero as part of the rebalance. Ideally, these custom implementations need a full-fledged unit test. core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment108425> maybeUpdateMaxUnackedOffset or ...MaxPendingOffset (depending on what you do above) core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment108428> You can call the update util method above. core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment108431> You need to also set tail.next = null (or None) - Joel Koshy On Dec. 16, 2014, 4:03 p.m., Jiangjie Qin wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/25995/ > ----------------------------------------------------------- > > (Updated Dec. 16, 2014, 4:03 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1650 and KAKFA-1650 > https://issues.apache.org/jira/browse/KAFKA-1650 > https://issues.apache.org/jira/browse/KAKFA-1650 > > > Repository: kafka > > > Description > ------- > > Addressed Guozhang's comments. > > > Addressed Guozhang's comments > > > commit before switch to trunk > > > commit before rebase > > > Rebased on trunk, Addressed Guozhang's comments. > > > Addressed Guozhang's comments on MaxInFlightRequests > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > > Incorporated Guozhang's comments > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > > Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. > > > Added consumer rebalance listener to mirror maker, will test it later. > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > Conflicts: > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala > > core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala > > added custom config for consumer rebalance listener > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > > Add configurable consumer rebalance listener > > > Incorporated Guozhang's comments > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > > Incorporated Guozhang's comments. > > > Addressed Guozhang's comment. > > > numMessageUnacked should be decremented no matter the send was successful or > not. > > > Addressed Jun's comments. > > > Incorporated Jun's comments > > > Incorporated Jun's comments and rebased on trunk > > > Rebased on current trunk > > > Diffs > ----- > > core/src/main/scala/kafka/consumer/ConsumerConnector.scala > 62c0686e816d2888772d5a911becf625eedee397 > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala > e991d2187d03241f639eeaf6769fb59c8c99664c > core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala > 9baad34a9793e5067d11289ece2154ba87b388af > core/src/main/scala/kafka/tools/MirrorMaker.scala > 77d951d13b8d8ad00af40257fe51623cc2caa61a > > Diff: https://reviews.apache.org/r/25995/diff/ > > > Testing > ------- > > > Thanks, > > Jiangjie Qin > >