> On Dec. 18, 2014, 10:42 a.m., Joel Koshy wrote: > > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 614 > > <https://reviews.apache.org/r/25995/diff/14/?file=795052#file795052line614> > > > > Should this be fatal? i.e., fatal is normally used before exiting > > (abnormally). WARN would be more suitable. > > > > I don't think it makes sense to "not advance" the offset here > > especially if you will still keep sending messages. I think you need to > > remove it from the unacked offset list. E.g., you may configure your mirror > > maker producer to only few retries (in which case you are okay with data > > loss). In this scenario you should just let the error go and allow the > > mirror maker to proceed normally. > > > > If someone wants zero data loss the MM should be configured with > > required acks -1 and infinite retries. > > > > Maybe I'm misunderstanding what zero data loss really means - can you > > clarify? (Especially if someone configures the producer with acks (say) one > > and limited retries) > > Jiangjie Qin wrote: > That makes sense. So I've changed the code to work in the following way: > > 1. If retries is set to infinite, the producer will keep retrying and the > entire pipeline will finally be blocked. (This is strict data-loss free.) > 2. If retries are not set to infinite, after the retries are exhausted, > it will remove the offset from unacked list and record it as a > skippedUnackedMessage, which is an exposed metric. > > Joel Koshy wrote: > For (1) will the pipeline be blocked though? i.e., the consumer will > continue to send messages to the data channel for all partitions. Also, we > will be susceptible to reordering for the partition that is being retried > even if maxInFlightRequests is set to one. (check wiki) > > It would be useful to very clearly outline the guarantees that the mirror > maker provides and under what configurations. e.g., zero data loss, whether > there will be duplicates, whether reordering is possible. > > Also, it may make sense to add a --lossless flag to the mirror maker that > automatically ensures the configs are set correctly: > * acks to -1 (or all) > * retries to infinity > * use new producer > * block on buffer exhaustion > * consumer autocommit off > * anything else that may be needed > > This makes it much easier to document and use. What do you think? > > Jiangjie Qin wrote: > The --lossless option is areally good suggestion. I'll add that and > document it. > > For 1), Yes the pipeline will finally be blocked. Because one of the > datachannle queue will eventually full, consumer thread will block on > queue.offer(), so the ZookeeperConsumerConnector data chunk queue will not > move anymore, then fetcher thread will block on putting data chunk into the > data chunk queue. Eventually the entire pipeline will be blocked. > I don't quite get why we could still have reodering if MaxInflightRequest > is set to 1. I understand that when a batch is retried, it is possible that > the previous try actually succeeded, so the batch is duplicated as a whole in > broker, there might be some reordering cross batches, but within the batch > the order should be preserved. Or do you mean the broker could ack only some > of the messages in a batch which could cause the batch to be resend thus the > messages in the same batch ends up out of order? > > By reordering I mean: > For 2 messages M1 with offset OS1 and message M2 with offset OS2, > assuming they are going to the same partition, > if OS2 > OS1, then the time M2 acked by target broker can only be after > M1 acked by broker. > > Anyway, current offset commit solution does not rely on the in-order > delivery anymore. But I think the problem still matters for some other use > cases. So I would like to fully understand it.
Sorry - you are right. There won't be reordering with pipelining turned off. And I agree with the point about the pipeline getting blocked (i.e., eventually). - Joel ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review65477 ----------------------------------------------------------- On Dec. 23, 2014, 3:07 a.m., Jiangjie Qin wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/25995/ > ----------------------------------------------------------- > > (Updated Dec. 23, 2014, 3:07 a.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1650 and KAKFA-1650 > https://issues.apache.org/jira/browse/KAFKA-1650 > https://issues.apache.org/jira/browse/KAKFA-1650 > > > Repository: kafka > > > Description > ------- > > Addressed Guozhang's comments. > > > Addressed Guozhang's comments > > > commit before switch to trunk > > > commit before rebase > > > Rebased on trunk, Addressed Guozhang's comments. > > > Addressed Guozhang's comments on MaxInFlightRequests > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > > Incorporated Guozhang's comments > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > > Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. > > > Added consumer rebalance listener to mirror maker, will test it later. > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > Conflicts: > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala > > core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala > > added custom config for consumer rebalance listener > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > > Add configurable consumer rebalance listener > > > Incorporated Guozhang's comments > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > > Incorporated Guozhang's comments. > > > Addressed Guozhang's comment. > > > numMessageUnacked should be decremented no matter the send was successful or > not. > > > Addressed Jun's comments. > > > Incorporated Jun's comments > > > Incorporated Jun's comments and rebased on trunk > > > Rebased on current trunk > > > Addressed Joel's comments. > > > Addressed Joel's comments. > > > Incorporated Joel's comments > > > Incorporated Joel's comments > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > > Incorporated Joel's comments > > > Fix a bug in metric. > > > Missed some change in the prvevious patch submission, submit patch again. > > > change offset commit thread to use scheduler. > > > Addressed Joel's comments. > > > Diffs > ----- > > core/src/main/scala/kafka/consumer/ConsumerConnector.scala > 62c0686e816d2888772d5a911becf625eedee397 > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala > e991d2187d03241f639eeaf6769fb59c8c99664c > core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala > 9baad34a9793e5067d11289ece2154ba87b388af > core/src/main/scala/kafka/tools/MirrorMaker.scala > 53cb16c2949e0ac36a0e943564fc9fc9b4c84caa > core/src/main/scala/kafka/utils/DoublyLinkedList.scala PRE-CREATION > core/src/test/scala/unit/kafka/utils/DoublyLinkedListTest.scala > PRE-CREATION > > Diff: https://reviews.apache.org/r/25995/diff/ > > > Testing > ------- > > > Thanks, > > Jiangjie Qin > >