> On Dec. 18, 2014, 10:42 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 614
> > <https://reviews.apache.org/r/25995/diff/14/?file=795052#file795052line614>
> >
> >     Should this be fatal? i.e., fatal is normally used before exiting 
> > (abnormally). WARN would be more suitable.
> >     
> >     I don't think it makes sense to "not advance" the offset here 
> > especially if you will still keep sending messages. I think you need to 
> > remove it from the unacked offset list. E.g., you may configure your mirror 
> > maker producer to only few retries (in which case you are okay with data 
> > loss). In this scenario you should just let the error go and allow the 
> > mirror maker to proceed normally.
> >     
> >     If someone wants zero data loss the MM should be configured with 
> > required acks -1 and infinite retries.
> >     
> >     Maybe I'm misunderstanding what zero data loss really means - can you 
> > clarify? (Especially if someone configures the producer with acks (say) one 
> > and limited retries)

That makes sense. So I've changed the code to work in the following way:

1. If retries is set to infinite, the producer will keep retrying and the 
entire pipeline will finally be blocked. (This is strict data-loss free.)
2. If retries are not set to infinite, after the retries are exhausted, it will 
remove the offset from unacked list and record it as a skippedUnackedMessage, 
which is an exposed metric.


> On Dec. 18, 2014, 10:42 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 668
> > <https://reviews.apache.org/r/25995/diff/14/?file=795052#file795052line668>
> >
> >     Should we make this a generic DoublyLinkedList data structure in utils 
> > or some other suitable place and unit test it as well?

I'm not sure if this is generic enough to put it into utils. This raw linked 
list seems only serve the purpose of removing/inserting node in the middle in 
O(1), which cannot be achieved in java linkedlist. Maybe we can keep it here 
now. And if later on there are some other use cases, we can refactor the code 
to create a raw LinkedList in utils and use that one. What do you think?


> On Dec. 18, 2014, 10:42 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/utils/Utils.scala, line 441
> > <https://reviews.apache.org/r/25995/diff/14/?file=795053#file795053line441>
> >
> >     I don't think this is necessary right? i.e., args.map won't throw an 
> > NPE if you don't provide any additional arguments.
> >     
> >         scala> def f(args: Int*) {println(args.size)}
> >         f: (args: Int*)Unit
> >     
> >         scala> f(1,2)
> >         2
> >     
> >         scala> f()
> >         0

Yes, it seems to be working.


- Jiangjie


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review65477
-----------------------------------------------------------


On Dec. 19, 2014, 2:48 a.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/25995/
> -----------------------------------------------------------
> 
> (Updated Dec. 19, 2014, 2:48 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1650 and KAKFA-1650
>     https://issues.apache.org/jira/browse/KAFKA-1650
>     https://issues.apache.org/jira/browse/KAKFA-1650
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments
> 
> 
> commit before switch to trunk
> 
> 
> commit before rebase
> 
> 
> Rebased on trunk, Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments on MaxInFlightRequests
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
> 
> 
> Added consumer rebalance listener to mirror maker, will test it later.
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> Conflicts:
>       core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
>       
> core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
> 
> added custom config for consumer rebalance listener
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Add configurable consumer rebalance listener
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Addressed Guozhang's comment.
> 
> 
> numMessageUnacked should be decremented no matter the send was successful or 
> not.
> 
> 
> Addressed Jun's comments.
> 
> 
> Incorporated Jun's comments
> 
> 
> Incorporated Jun's comments and rebased on trunk
> 
> 
> Rebased on current trunk
> 
> 
> Addressed Joel's comments.
> 
> 
> Addressed Joel's comments.
> 
> 
> Incorporated Joel's comments
> 
> 
> Incorporated Joel's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Incorporated Joel's comments
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/consumer/ConsumerConnector.scala 
> 62c0686e816d2888772d5a911becf625eedee397 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
> e991d2187d03241f639eeaf6769fb59c8c99664c 
>   core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
> 9baad34a9793e5067d11289ece2154ba87b388af 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala 
> 53cb16c2949e0ac36a0e943564fc9fc9b4c84caa 
> 
> Diff: https://reviews.apache.org/r/25995/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>

Reply via email to