> On Dec. 18, 2014, 10:42 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 614
> > <https://reviews.apache.org/r/25995/diff/14/?file=795052#file795052line614>
> >
> >     Should this be fatal? i.e., fatal is normally used before exiting 
> > (abnormally). WARN would be more suitable.
> >     
> >     I don't think it makes sense to "not advance" the offset here 
> > especially if you will still keep sending messages. I think you need to 
> > remove it from the unacked offset list. E.g., you may configure your mirror 
> > maker producer to only few retries (in which case you are okay with data 
> > loss). In this scenario you should just let the error go and allow the 
> > mirror maker to proceed normally.
> >     
> >     If someone wants zero data loss the MM should be configured with 
> > required acks -1 and infinite retries.
> >     
> >     Maybe I'm misunderstanding what zero data loss really means - can you 
> > clarify? (Especially if someone configures the producer with acks (say) one 
> > and limited retries)
> 
> Jiangjie Qin wrote:
>     That makes sense. So I've changed the code to work in the following way:
>     
>     1. If retries is set to infinite, the producer will keep retrying and the 
> entire pipeline will finally be blocked. (This is strict data-loss free.)
>     2. If retries are not set to infinite, after the retries are exhausted, 
> it will remove the offset from unacked list and record it as a 
> skippedUnackedMessage, which is an exposed metric.
> 
> Joel Koshy wrote:
>     For (1) will the pipeline be blocked though? i.e., the consumer will 
> continue to send messages to the data channel for all partitions. Also, we 
> will be susceptible to reordering for the partition that is being retried 
> even if maxInFlightRequests is set to one. (check wiki)
>     
>     It would be useful to very clearly outline the guarantees that the mirror 
> maker provides and under what configurations. e.g., zero data loss, whether 
> there will be duplicates, whether reordering is possible.
>     
>     Also, it may make sense to add a --lossless flag to the mirror maker that 
> automatically ensures the configs are set correctly:
>     * acks to -1 (or all)
>     * retries to infinity
>     * use new producer
>     * block on buffer exhaustion
>     * consumer autocommit off
>     * anything else that may be needed
>     
>     This makes it much easier to document and use. What do you think?
> 
> Jiangjie Qin wrote:
>     The --lossless option is areally good suggestion. I'll add that and 
> document it.
>     
>     For 1), Yes the pipeline will finally be blocked. Because one of the 
> datachannle queue will eventually full, consumer thread will block on 
> queue.offer(), so the ZookeeperConsumerConnector data chunk queue will not 
> move anymore, then fetcher thread will block on putting data chunk into the 
> data chunk queue. Eventually the entire pipeline will be blocked.
>     I don't quite get why we could still have reodering if MaxInflightRequest 
> is set to 1. I understand that when a batch is retried, it is possible that 
> the previous try actually succeeded, so the batch is duplicated as a whole in 
> broker, there might be some reordering cross batches, but within the batch 
> the order should be preserved. Or do you mean the broker could ack only some 
> of the messages in a batch which could cause the batch to be resend thus the 
> messages in the same batch ends up out of order?
>     
>     By reordering I mean:
>     For 2 messages M1 with offset OS1 and message M2 with offset OS2, 
> assuming they are going to the same partition,
>     if OS2 > OS1, then the time M2 acked by target broker can only be after 
> M1 acked by broker.
>     
>     Anyway, current offset commit solution does not rely on the in-order 
> delivery anymore. But I think the problem still matters for some other use 
> cases. So I would like to fully understand it.

Sorry - you are right. There won't be reordering with pipelining turned off. 
And I agree with the point about the pipeline getting blocked (i.e., 
eventually).


- Joel


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review65477
-----------------------------------------------------------


On Dec. 23, 2014, 3:07 a.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/25995/
> -----------------------------------------------------------
> 
> (Updated Dec. 23, 2014, 3:07 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1650 and KAKFA-1650
>     https://issues.apache.org/jira/browse/KAFKA-1650
>     https://issues.apache.org/jira/browse/KAKFA-1650
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments
> 
> 
> commit before switch to trunk
> 
> 
> commit before rebase
> 
> 
> Rebased on trunk, Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments on MaxInFlightRequests
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
> 
> 
> Added consumer rebalance listener to mirror maker, will test it later.
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> Conflicts:
>       core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
>       
> core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
> 
> added custom config for consumer rebalance listener
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Add configurable consumer rebalance listener
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Addressed Guozhang's comment.
> 
> 
> numMessageUnacked should be decremented no matter the send was successful or 
> not.
> 
> 
> Addressed Jun's comments.
> 
> 
> Incorporated Jun's comments
> 
> 
> Incorporated Jun's comments and rebased on trunk
> 
> 
> Rebased on current trunk
> 
> 
> Addressed Joel's comments.
> 
> 
> Addressed Joel's comments.
> 
> 
> Incorporated Joel's comments
> 
> 
> Incorporated Joel's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Incorporated Joel's comments
> 
> 
> Fix a bug in metric.
> 
> 
> Missed some change in the prvevious patch submission, submit patch again.
> 
> 
> change offset commit thread to use scheduler.
> 
> 
> Addressed Joel's comments.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/consumer/ConsumerConnector.scala 
> 62c0686e816d2888772d5a911becf625eedee397 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
> e991d2187d03241f639eeaf6769fb59c8c99664c 
>   core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
> 9baad34a9793e5067d11289ece2154ba87b388af 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala 
> 53cb16c2949e0ac36a0e943564fc9fc9b4c84caa 
>   core/src/main/scala/kafka/utils/DoublyLinkedList.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/utils/DoublyLinkedListTest.scala 
> PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/25995/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>

Reply via email to