-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review65759
-----------------------------------------------------------



core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
<https://reviews.apache.org/r/25995/#comment109001>

    if isAutoCommit is true then we will not retry anyway. I think this 
condition can be removed altogether. i.e., if we are shutting down, then we 
should probably allow committing offsets up to retryCount. I don't recollect 
why this was written this way, but I think retrying up to the configured retry 
count is reasonable on shutdown. Do you agree?



core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
<https://reviews.apache.org/r/25995/#comment109002>

    Since this is a Java compatility wrapper you need to use java.util.Map and 
convert that to a scala map.
    
    Alternatively we can remove this altogether since it is only used 
semi-internally (by the mirror maker). However, I think it would be good to add 
this here with the fix.



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment109003>

    val scheduler = new KafkaScheduler
    
    and later on
    
    scheduler.startup
    
    (since the scheduler does nothing until it is started up)



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment109004>

    NumUnackedMessages



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment109006>

    NumUnackedOffsets is a "weird" metric especially with the presence of 
NumUnackedMessages. Can you think of a better name? My attempt would be 
NumPendingSourceOffsets - but I don't like that either.



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment109005>

    `unackedOffsetMap.valuesIterator.map(unackedOffsets => 
unackedOffsets.size).sum`



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment109015>

    Add comment on what this metric is. Actually do we need this since this 
will be covered by the producer's dropped metric? As above, this is also a 
weird mbean to see. Not sure if we can come up with a better name.



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment109016>

    Can you move this to the if (useNewProducer) block further down (line 230)



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment109017>

    I think the final commitOffsets (on shutdown) should be a reliable commit - 
i.e., retry up to configured retries if there are commit errors.



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment109029>

    If block on buffer exhaustion is not turned on, do we still want to shut 
down the mirror maker? i.e., if the user really wants zero data loss they would 
set that to true right?
    
    If it is set to false and the MM exits what purpose does it serve?



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment109018>

    Capital C



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment109019>

    Committing offsets: (also, should we just do this trace message in consumer 
connector instead? Right now it is not there, but I think it should)



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment109020>

    The commit "thread" has not exited. i.e., this should just be "Shutting 
down mirror maker due to error when committing offsets." I think OOME is the 
only exception we should really shutdown right? i.e., we should probably let 
everything else go right?



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment109021>

    Line 522: Minor nit - how about naming this just unackedOffset (instead of 
offsetNode)?



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment109023>

    Updating offset to commit for %s to %d. (Note that [%s] will give you 
[[topic, partition]])



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment109024>

    Should probably also catch interrupted exception



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment109025>

    if (customRebalanceListener.isDefined)



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment109027>

    Should (ideally) be volatile since it is reported by the gauge.



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment109026>

    What does "node validation skipped" mean?


- Joel Koshy


On Dec. 19, 2014, 7:41 a.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/25995/
> -----------------------------------------------------------
> 
> (Updated Dec. 19, 2014, 7:41 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1650 and KAKFA-1650
>     https://issues.apache.org/jira/browse/KAFKA-1650
>     https://issues.apache.org/jira/browse/KAKFA-1650
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments
> 
> 
> commit before switch to trunk
> 
> 
> commit before rebase
> 
> 
> Rebased on trunk, Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments on MaxInFlightRequests
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
> 
> 
> Added consumer rebalance listener to mirror maker, will test it later.
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> Conflicts:
>       core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
>       
> core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
> 
> added custom config for consumer rebalance listener
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Add configurable consumer rebalance listener
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Addressed Guozhang's comment.
> 
> 
> numMessageUnacked should be decremented no matter the send was successful or 
> not.
> 
> 
> Addressed Jun's comments.
> 
> 
> Incorporated Jun's comments
> 
> 
> Incorporated Jun's comments and rebased on trunk
> 
> 
> Rebased on current trunk
> 
> 
> Addressed Joel's comments.
> 
> 
> Addressed Joel's comments.
> 
> 
> Incorporated Joel's comments
> 
> 
> Incorporated Joel's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Incorporated Joel's comments
> 
> 
> Fix a bug in metric.
> 
> 
> Missed some change in the prvevious patch submission, submit patch again.
> 
> 
> change offset commit thread to use scheduler.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/consumer/ConsumerConnector.scala 
> 62c0686e816d2888772d5a911becf625eedee397 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
> e991d2187d03241f639eeaf6769fb59c8c99664c 
>   core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
> 9baad34a9793e5067d11289ece2154ba87b388af 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala 
> 53cb16c2949e0ac36a0e943564fc9fc9b4c84caa 
> 
> Diff: https://reviews.apache.org/r/25995/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>

Reply via email to