> On Dec. 22, 2014, 1:34 p.m., Joel Koshy wrote: > > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 85 > > <https://reviews.apache.org/r/25995/diff/18/?file=797123#file797123line85> > > > > NumUnackedOffsets is a "weird" metric especially with the presence of > > NumUnackedMessages. Can you think of a better name? My attempt would be > > NumPendingSourceOffsets - but I don't like that either. > > Jiangjie Qin wrote: > How about UnackedOffsetListsSize?
Yes that sounds better. > On Dec. 22, 2014, 1:34 p.m., Joel Koshy wrote: > > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 198 > > <https://reviews.apache.org/r/25995/diff/18/?file=797123#file797123line198> > > > > Can you move this to the if (useNewProducer) block further down (line > > 230) > > Jiangjie Qin wrote: > This block is here because the consumer property has to be set before the > consumer instantiation. And offset commit thread could only starts after > consumer connector is instantiated. It also seems better for offset commit > thread starts after producer thread starts. That's why we have those spreaded > if statement... I thought this was before the instantiation, but maybe I misread. Either way, I think your most recent patch looks fine in this regards with the no.data.loss option being inspected when instantiating the consumer and producer separately. > On Dec. 22, 2014, 1:34 p.m., Joel Koshy wrote: > > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 295 > > <https://reviews.apache.org/r/25995/diff/18/?file=797123#file797123line295> > > > > I think the final commitOffsets (on shutdown) should be a reliable > > commit - i.e., retry up to configured retries if there are commit errors. > > Jiangjie Qin wrote: > Yes, it will, isAutoCommit is set to false. This commitOffsets is defined > in mirror maker wraps the ZooKeeperConsumerConnector.commitOffsets, where > isAutoCommit == false. Or do you mean something else? Yes you are right. I missed that it is a wrapper. > On Dec. 22, 2014, 1:34 p.m., Joel Koshy wrote: > > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 449 > > <https://reviews.apache.org/r/25995/diff/18/?file=797123#file797123line449> > > > > If block on buffer exhaustion is not turned on, do we still want to > > shut down the mirror maker? i.e., if the user really wants zero data loss > > they would set that to true right? > > > > If it is set to false and the MM exits what purpose does it serve? > > Jiangjie Qin wrote: > I kind of think that we should let user know that there is something > wrong in the producer once it occurs. For users not care about zero data > loss, they probably still want to at least have a working mirror maker. If we > just drop the message and let producer move on, potentially we can have a > running mirror maker that only drops messages. In that case, it's probably > better to let the mirror maker die to indicate something wrong happened. So > I'm thinking exits on BufferExhaustedException is more from normal operating > point of view instead of zero data loss point of view. Ok - we can leave it as is and revisit if necessary. If the MM exits on buffer exhaustion then the user will know and will bump up memory. OTOH some users may be okay with dropping messages at peak traffic. I'm not sure which is better. > On Dec. 22, 2014, 1:34 p.m., Joel Koshy wrote: > > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 90 > > <https://reviews.apache.org/r/25995/diff/18/?file=797123#file797123line90> > > > > Add comment on what this metric is. Actually do we need this since this > > will be covered by the producer's dropped metric? As above, this is also a > > weird mbean to see. Not sure if we can come up with a better name. > > Jiangjie Qin wrote: > I added comments to this metric. I'm kind of relactant from using > producer's dropped metrics. The reason is that the intention of this metric > is diffrent from dropped messages, although they have some connection. Also, > we could potentially have many producers, it would be better if we have a > single number instead of having to go through multiple metrics. What do you > think? Ok sounds good. - Joel ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review65759 ----------------------------------------------------------- On Dec. 23, 2014, 3:07 a.m., Jiangjie Qin wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/25995/ > ----------------------------------------------------------- > > (Updated Dec. 23, 2014, 3:07 a.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1650 and KAKFA-1650 > https://issues.apache.org/jira/browse/KAFKA-1650 > https://issues.apache.org/jira/browse/KAKFA-1650 > > > Repository: kafka > > > Description > ------- > > Addressed Guozhang's comments. > > > Addressed Guozhang's comments > > > commit before switch to trunk > > > commit before rebase > > > Rebased on trunk, Addressed Guozhang's comments. > > > Addressed Guozhang's comments on MaxInFlightRequests > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > > Incorporated Guozhang's comments > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > > Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. > > > Added consumer rebalance listener to mirror maker, will test it later. > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > Conflicts: > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala > > core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala > > added custom config for consumer rebalance listener > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > > Add configurable consumer rebalance listener > > > Incorporated Guozhang's comments > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > > Incorporated Guozhang's comments. > > > Addressed Guozhang's comment. > > > numMessageUnacked should be decremented no matter the send was successful or > not. > > > Addressed Jun's comments. > > > Incorporated Jun's comments > > > Incorporated Jun's comments and rebased on trunk > > > Rebased on current trunk > > > Addressed Joel's comments. > > > Addressed Joel's comments. > > > Incorporated Joel's comments > > > Incorporated Joel's comments > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > > Incorporated Joel's comments > > > Fix a bug in metric. > > > Missed some change in the prvevious patch submission, submit patch again. > > > change offset commit thread to use scheduler. > > > Addressed Joel's comments. > > > Diffs > ----- > > core/src/main/scala/kafka/consumer/ConsumerConnector.scala > 62c0686e816d2888772d5a911becf625eedee397 > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala > e991d2187d03241f639eeaf6769fb59c8c99664c > core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala > 9baad34a9793e5067d11289ece2154ba87b388af > core/src/main/scala/kafka/tools/MirrorMaker.scala > 53cb16c2949e0ac36a0e943564fc9fc9b4c84caa > core/src/main/scala/kafka/utils/DoublyLinkedList.scala PRE-CREATION > core/src/test/scala/unit/kafka/utils/DoublyLinkedListTest.scala > PRE-CREATION > > Diff: https://reviews.apache.org/r/25995/diff/ > > > Testing > ------- > > > Thanks, > > Jiangjie Qin > >