> On Dec. 18, 2014, 10:42 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 614
> > <https://reviews.apache.org/r/25995/diff/14/?file=795052#file795052line614>
> >
> >     Should this be fatal? i.e., fatal is normally used before exiting 
> > (abnormally). WARN would be more suitable.
> >     
> >     I don't think it makes sense to "not advance" the offset here 
> > especially if you will still keep sending messages. I think you need to 
> > remove it from the unacked offset list. E.g., you may configure your mirror 
> > maker producer to only few retries (in which case you are okay with data 
> > loss). In this scenario you should just let the error go and allow the 
> > mirror maker to proceed normally.
> >     
> >     If someone wants zero data loss the MM should be configured with 
> > required acks -1 and infinite retries.
> >     
> >     Maybe I'm misunderstanding what zero data loss really means - can you 
> > clarify? (Especially if someone configures the producer with acks (say) one 
> > and limited retries)
> 
> Jiangjie Qin wrote:
>     That makes sense. So I've changed the code to work in the following way:
>     
>     1. If retries is set to infinite, the producer will keep retrying and the 
> entire pipeline will finally be blocked. (This is strict data-loss free.)
>     2. If retries are not set to infinite, after the retries are exhausted, 
> it will remove the offset from unacked list and record it as a 
> skippedUnackedMessage, which is an exposed metric.
> 
> Joel Koshy wrote:
>     For (1) will the pipeline be blocked though? i.e., the consumer will 
> continue to send messages to the data channel for all partitions. Also, we 
> will be susceptible to reordering for the partition that is being retried 
> even if maxInFlightRequests is set to one. (check wiki)
>     
>     It would be useful to very clearly outline the guarantees that the mirror 
> maker provides and under what configurations. e.g., zero data loss, whether 
> there will be duplicates, whether reordering is possible.
>     
>     Also, it may make sense to add a --lossless flag to the mirror maker that 
> automatically ensures the configs are set correctly:
>     * acks to -1 (or all)
>     * retries to infinity
>     * use new producer
>     * block on buffer exhaustion
>     * consumer autocommit off
>     * anything else that may be needed
>     
>     This makes it much easier to document and use. What do you think?

The --lossless option is areally good suggestion. I'll add that and document it.

For 1), Yes the pipeline will finally be blocked. Because one of the 
datachannle queue will eventually full, consumer thread will block on 
queue.offer(), so the ZookeeperConsumerConnector data chunk queue will not move 
anymore, then fetcher thread will block on putting data chunk into the data 
chunk queue. Eventually the entire pipeline will be blocked.
I don't quite get why we could still have reodering if MaxInflightRequest is 
set to 1. I understand that when a batch is retried, it is possible that the 
previous try actually succeeded, so the batch is duplicated as a whole in 
broker, there might be some reordering cross batches, but within the batch the 
order should be preserved. Or do you mean the broker could ack only some of the 
messages in a batch which could cause the batch to be resend thus the messages 
in the same batch ends up out of order?

By reordering I mean:
For 2 messages M1 with offset OS1 and message M2 with offset OS2, assuming they 
are going to the same partition,
if OS2 > OS1, then the time M2 acked by target broker can only be after M1 
acked by broker.

Anyway, current offset commit solution does not rely on the in-order delivery 
anymore. But I think the problem still matters for some other use cases. So I 
would like to fully understand it.


> On Dec. 18, 2014, 10:42 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 668
> > <https://reviews.apache.org/r/25995/diff/14/?file=795052#file795052line668>
> >
> >     Should we make this a generic DoublyLinkedList data structure in utils 
> > or some other suitable place and unit test it as well?
> 
> Jiangjie Qin wrote:
>     I'm not sure if this is generic enough to put it into utils. This raw 
> linked list seems only serve the purpose of removing/inserting node in the 
> middle in O(1), which cannot be achieved in java linkedlist. Maybe we can 
> keep it here now. And if later on there are some other use cases, we can 
> refactor the code to create a raw LinkedList in utils and use that one. What 
> do you think?
> 
> Joel Koshy wrote:
>     Sort of agree, but only to some degree. The main issue is that you cannot 
> unit test this - unless we come up with "mirror maker" unit tests that have 
> 100 percent code coverage of this utility. That's why it's generally a sign 
> that the utility needs to be moved out so that it can be tested thoroughly.
>     
> http://stackoverflow.com/questions/3353318/how-do-i-test-local-inner-class-methods-in-java

Thanks for guiding me on this. I will write a simplified version of raw 
linkedlist and put it into util so we can unit test it.


- Jiangjie


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review65477
-----------------------------------------------------------


On Dec. 23, 2014, 3:07 a.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/25995/
> -----------------------------------------------------------
> 
> (Updated Dec. 23, 2014, 3:07 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1650 and KAKFA-1650
>     https://issues.apache.org/jira/browse/KAFKA-1650
>     https://issues.apache.org/jira/browse/KAKFA-1650
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments
> 
> 
> commit before switch to trunk
> 
> 
> commit before rebase
> 
> 
> Rebased on trunk, Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments on MaxInFlightRequests
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
> 
> 
> Added consumer rebalance listener to mirror maker, will test it later.
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> Conflicts:
>       core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
>       
> core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
> 
> added custom config for consumer rebalance listener
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Add configurable consumer rebalance listener
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Addressed Guozhang's comment.
> 
> 
> numMessageUnacked should be decremented no matter the send was successful or 
> not.
> 
> 
> Addressed Jun's comments.
> 
> 
> Incorporated Jun's comments
> 
> 
> Incorporated Jun's comments and rebased on trunk
> 
> 
> Rebased on current trunk
> 
> 
> Addressed Joel's comments.
> 
> 
> Addressed Joel's comments.
> 
> 
> Incorporated Joel's comments
> 
> 
> Incorporated Joel's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Incorporated Joel's comments
> 
> 
> Fix a bug in metric.
> 
> 
> Missed some change in the prvevious patch submission, submit patch again.
> 
> 
> change offset commit thread to use scheduler.
> 
> 
> Addressed Joel's comments.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/consumer/ConsumerConnector.scala 
> 62c0686e816d2888772d5a911becf625eedee397 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
> e991d2187d03241f639eeaf6769fb59c8c99664c 
>   core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
> 9baad34a9793e5067d11289ece2154ba87b388af 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala 
> 53cb16c2949e0ac36a0e943564fc9fc9b4c84caa 
>   core/src/main/scala/kafka/utils/DoublyLinkedList.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/utils/DoublyLinkedListTest.scala 
> PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/25995/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>

Reply via email to