> On Dec. 3, 2014, 6:57 p.m., Guozhang Wang wrote: > > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 613 > > <https://reviews.apache.org/r/25995/diff/7/?file=774325#file774325line613> > > > > I think there may be a race condition here, for example consider this > > sequence: > > > > 1. data channel only contain one message. > > 2. producer take the message from channel. > > 3. dataChannel.clear() called. > > 4. numMessageUnacked.get() == 0, offsets committed. > > 5. producer.send() called, increment numMessageUnacked. > > 6. data duplicate happens when the rebalance finished. > > > > I think on line 599 we should use "while" instead of "if", but this > > alone does not fix this.
Yes, I actually have comment on this race condition in line 581. The reason I'm not handling it here is: 1. The chance of this situation is very slight. 2. A single duplicate message does not really hurt. 3. The fix increase the complexity of the code (looking into the producer thread status) and I'm not sure if it worth doing. 4. Even if we fix this, from the producer side, duplicates could still happen. > On Dec. 3, 2014, 6:57 p.m., Guozhang Wang wrote: > > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 96 > > <https://reviews.apache.org/r/25995/diff/7/?file=774325#file774325line96> > > > > Could you add a comment here about the in-flight-request config and its > > effects? The comments was put at the very beginning with a note. Maybe we can put a comment referring to that note. > On Dec. 3, 2014, 6:57 p.m., Guozhang Wang wrote: > > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 186 > > <https://reviews.apache.org/r/25995/diff/7/?file=774325#file774325line186> > > > > I am wondering are there some scenarios we want to allow customized > > rebalance listener? Also if we decide to make this customizable we need to > > make it clear that the customized listener would expect the datachannel as > > its constructor since this is not checked at compile time. Yes, we do foresee some usecases for this customized rebalance listener. I'll add the following comments: "Customized consumer rebalance listener should extends MirrorMakerConsumerRebalanceListener and take datachannel as argument." - Jiangjie ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review63703 ----------------------------------------------------------- On Nov. 24, 2014, 4:15 p.m., Jiangjie Qin wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/25995/ > ----------------------------------------------------------- > > (Updated Nov. 24, 2014, 4:15 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1650 and KAKFA-1650 > https://issues.apache.org/jira/browse/KAFKA-1650 > https://issues.apache.org/jira/browse/KAKFA-1650 > > > Repository: kafka > > > Description > ------- > > Addressed Guozhang's comments. > > > Addressed Guozhang's comments > > > commit before switch to trunk > > > commit before rebase > > > Rebased on trunk, Addressed Guozhang's comments. > > > Addressed Guozhang's comments on MaxInFlightRequests > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > > Incorporated Guozhang's comments > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > > Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. > > > Added consumer rebalance listener to mirror maker, will test it later. > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > Conflicts: > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala > > core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala > > added custom config for consumer rebalance listener > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > > Add configurable consumer rebalance listener > > > Diffs > ----- > > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala > 3e1718bc7ca6c835a59ca7c6879f558ec06866ee > core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java > PRE-CREATION > core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala > 9d5a47fb8e04d0055cce820afde7f73affc0a984 > core/src/main/scala/kafka/tools/MirrorMaker.scala > f399105087588946987bbc84e3759935d9498b6a > core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala > 6a85d7e494f6c88798133a17f6180b61029dff58 > > core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala > 8c4687b2c96fddab7c8454a5a8011c3bab0897a0 > > Diff: https://reviews.apache.org/r/25995/diff/ > > > Testing > ------- > > > Thanks, > > Jiangjie Qin > >