> On Dec. 3, 2014, 6:57 p.m., Guozhang Wang wrote: > > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 613 > > <https://reviews.apache.org/r/25995/diff/7/?file=774325#file774325line613> > > > > I think there may be a race condition here, for example consider this > > sequence: > > > > 1. data channel only contain one message. > > 2. producer take the message from channel. > > 3. dataChannel.clear() called. > > 4. numMessageUnacked.get() == 0, offsets committed. > > 5. producer.send() called, increment numMessageUnacked. > > 6. data duplicate happens when the rebalance finished. > > > > I think on line 599 we should use "while" instead of "if", but this > > alone does not fix this. > > Jiangjie Qin wrote: > Yes, I actually have comment on this race condition in line 581. The > reason I'm not handling it here is: > 1. The chance of this situation is very slight. > 2. A single duplicate message does not really hurt. > 3. The fix increase the complexity of the code (looking into the producer > thread status) and I'm not sure if it worth doing. > 4. Even if we fix this, from the producer side, duplicates could still > happen. > > Guozhang Wang wrote: > Shall we change line 691 to "while (numMessageUnacked.get() > 0)" at > least?
Yes, it should be a while loop, forgot to change it... - Jiangjie ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review63703 ----------------------------------------------------------- On Dec. 4, 2014, 3:02 a.m., Jiangjie Qin wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/25995/ > ----------------------------------------------------------- > > (Updated Dec. 4, 2014, 3:02 a.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1650 and KAKFA-1650 > https://issues.apache.org/jira/browse/KAFKA-1650 > https://issues.apache.org/jira/browse/KAKFA-1650 > > > Repository: kafka > > > Description > ------- > > Addressed Guozhang's comments. > > > Addressed Guozhang's comments > > > commit before switch to trunk > > > commit before rebase > > > Rebased on trunk, Addressed Guozhang's comments. > > > Addressed Guozhang's comments on MaxInFlightRequests > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > > Incorporated Guozhang's comments > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > > Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. > > > Added consumer rebalance listener to mirror maker, will test it later. > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > Conflicts: > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala > > core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala > > added custom config for consumer rebalance listener > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > > Add configurable consumer rebalance listener > > > Incorporated Guozhang's comments > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > > Incorporated Guozhang's comments. > > > Addressed Guozhang's comment. > > > Diffs > ----- > > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala > da29a8cb461099eb675161db2f11a9937424a5c6 > core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java > PRE-CREATION > core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala > 9d5a47fb8e04d0055cce820afde7f73affc0a984 > core/src/main/scala/kafka/tools/MirrorMaker.scala > f399105087588946987bbc84e3759935d9498b6a > core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala > 6a85d7e494f6c88798133a17f6180b61029dff58 > > core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala > 8c4687b2c96fddab7c8454a5a8011c3bab0897a0 > > Diff: https://reviews.apache.org/r/25995/diff/ > > > Testing > ------- > > > Thanks, > > Jiangjie Qin > >