> On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote: > > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 489 > > <https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line489> > > > > Why not use KafkaScheduler for the offset commit task? > > Jiangjie Qin wrote: > Haven't thought that before... But it looks that we need to do some more > handling when something wrong happen in the offset commit threads. The > KafkaScheduler code seems not do so. > > Joel Koshy wrote: > So you can make the task itself catch throwables. So it would look > something like this: > > scheduler.schedule("mirrorMakerOffsetsCommiter", commitTask, ...) > > And in commitTask: > try { > commitOffsets() > } > catch { > case t: Throwable => > // handle > } > > That said, I don't think connector.commitOffsets will throw anything - > since we catch all throwables there. > > The only additional detail is that after you shutdown the scheduler you > will need to call commitOffsets() manually one last time.
I changed the code to use scheduler, it seems that the try catch block only handles the kafka based offset commit and it did not include ensureOffsetsManager connected. Also, theoretically OOM could be thrown when create a super big offsetmap, so I kept the catch block to make mirror maker exit in that case. - Jiangjie ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review65306 ----------------------------------------------------------- On Dec. 19, 2014, 7:41 a.m., Jiangjie Qin wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/25995/ > ----------------------------------------------------------- > > (Updated Dec. 19, 2014, 7:41 a.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1650 and KAKFA-1650 > https://issues.apache.org/jira/browse/KAFKA-1650 > https://issues.apache.org/jira/browse/KAKFA-1650 > > > Repository: kafka > > > Description > ------- > > Addressed Guozhang's comments. > > > Addressed Guozhang's comments > > > commit before switch to trunk > > > commit before rebase > > > Rebased on trunk, Addressed Guozhang's comments. > > > Addressed Guozhang's comments on MaxInFlightRequests > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > > Incorporated Guozhang's comments > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > > Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. > > > Added consumer rebalance listener to mirror maker, will test it later. > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > Conflicts: > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala > > core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala > > added custom config for consumer rebalance listener > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > > Add configurable consumer rebalance listener > > > Incorporated Guozhang's comments > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > > Incorporated Guozhang's comments. > > > Addressed Guozhang's comment. > > > numMessageUnacked should be decremented no matter the send was successful or > not. > > > Addressed Jun's comments. > > > Incorporated Jun's comments > > > Incorporated Jun's comments and rebased on trunk > > > Rebased on current trunk > > > Addressed Joel's comments. > > > Addressed Joel's comments. > > > Incorporated Joel's comments > > > Incorporated Joel's comments > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > > Incorporated Joel's comments > > > Fix a bug in metric. > > > Missed some change in the prvevious patch submission, submit patch again. > > > change offset commit thread to use scheduler. > > > Diffs > ----- > > core/src/main/scala/kafka/consumer/ConsumerConnector.scala > 62c0686e816d2888772d5a911becf625eedee397 > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala > e991d2187d03241f639eeaf6769fb59c8c99664c > core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala > 9baad34a9793e5067d11289ece2154ba87b388af > core/src/main/scala/kafka/tools/MirrorMaker.scala > 53cb16c2949e0ac36a0e943564fc9fc9b4c84caa > > Diff: https://reviews.apache.org/r/25995/diff/ > > > Testing > ------- > > > Thanks, > > Jiangjie Qin > >