----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review65759 -----------------------------------------------------------
core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala <https://reviews.apache.org/r/25995/#comment109001> if isAutoCommit is true then we will not retry anyway. I think this condition can be removed altogether. i.e., if we are shutting down, then we should probably allow committing offsets up to retryCount. I don't recollect why this was written this way, but I think retrying up to the configured retry count is reasonable on shutdown. Do you agree? core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala <https://reviews.apache.org/r/25995/#comment109002> Since this is a Java compatility wrapper you need to use java.util.Map and convert that to a scala map. Alternatively we can remove this altogether since it is only used semi-internally (by the mirror maker). However, I think it would be good to add this here with the fix. core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment109003> val scheduler = new KafkaScheduler and later on scheduler.startup (since the scheduler does nothing until it is started up) core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment109004> NumUnackedMessages core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment109006> NumUnackedOffsets is a "weird" metric especially with the presence of NumUnackedMessages. Can you think of a better name? My attempt would be NumPendingSourceOffsets - but I don't like that either. core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment109005> `unackedOffsetMap.valuesIterator.map(unackedOffsets => unackedOffsets.size).sum` core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment109015> Add comment on what this metric is. Actually do we need this since this will be covered by the producer's dropped metric? As above, this is also a weird mbean to see. Not sure if we can come up with a better name. core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment109016> Can you move this to the if (useNewProducer) block further down (line 230) core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment109017> I think the final commitOffsets (on shutdown) should be a reliable commit - i.e., retry up to configured retries if there are commit errors. core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment109029> If block on buffer exhaustion is not turned on, do we still want to shut down the mirror maker? i.e., if the user really wants zero data loss they would set that to true right? If it is set to false and the MM exits what purpose does it serve? core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment109018> Capital C core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment109019> Committing offsets: (also, should we just do this trace message in consumer connector instead? Right now it is not there, but I think it should) core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment109020> The commit "thread" has not exited. i.e., this should just be "Shutting down mirror maker due to error when committing offsets." I think OOME is the only exception we should really shutdown right? i.e., we should probably let everything else go right? core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment109021> Line 522: Minor nit - how about naming this just unackedOffset (instead of offsetNode)? core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment109023> Updating offset to commit for %s to %d. (Note that [%s] will give you [[topic, partition]]) core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment109024> Should probably also catch interrupted exception core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment109025> if (customRebalanceListener.isDefined) core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment109027> Should (ideally) be volatile since it is reported by the gauge. core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment109026> What does "node validation skipped" mean? - Joel Koshy On Dec. 19, 2014, 7:41 a.m., Jiangjie Qin wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/25995/ > ----------------------------------------------------------- > > (Updated Dec. 19, 2014, 7:41 a.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1650 and KAKFA-1650 > https://issues.apache.org/jira/browse/KAFKA-1650 > https://issues.apache.org/jira/browse/KAKFA-1650 > > > Repository: kafka > > > Description > ------- > > Addressed Guozhang's comments. > > > Addressed Guozhang's comments > > > commit before switch to trunk > > > commit before rebase > > > Rebased on trunk, Addressed Guozhang's comments. > > > Addressed Guozhang's comments on MaxInFlightRequests > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > > Incorporated Guozhang's comments > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > > Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. > > > Added consumer rebalance listener to mirror maker, will test it later. > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > Conflicts: > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala > > core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala > > added custom config for consumer rebalance listener > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > > Add configurable consumer rebalance listener > > > Incorporated Guozhang's comments > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > > Incorporated Guozhang's comments. > > > Addressed Guozhang's comment. > > > numMessageUnacked should be decremented no matter the send was successful or > not. > > > Addressed Jun's comments. > > > Incorporated Jun's comments > > > Incorporated Jun's comments and rebased on trunk > > > Rebased on current trunk > > > Addressed Joel's comments. > > > Addressed Joel's comments. > > > Incorporated Joel's comments > > > Incorporated Joel's comments > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > > Incorporated Joel's comments > > > Fix a bug in metric. > > > Missed some change in the prvevious patch submission, submit patch again. > > > change offset commit thread to use scheduler. > > > Diffs > ----- > > core/src/main/scala/kafka/consumer/ConsumerConnector.scala > 62c0686e816d2888772d5a911becf625eedee397 > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala > e991d2187d03241f639eeaf6769fb59c8c99664c > core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala > 9baad34a9793e5067d11289ece2154ba87b388af > core/src/main/scala/kafka/tools/MirrorMaker.scala > 53cb16c2949e0ac36a0e943564fc9fc9b4c84caa > > Diff: https://reviews.apache.org/r/25995/diff/ > > > Testing > ------- > > > Thanks, > > Jiangjie Qin > >