> On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala, line 
> > 338
> > <https://reviews.apache.org/r/25995/diff/13/?file=793041#file793041line338>
> >
> >     Can we make this a "reliable" commit - i.e., with retries up to the 
> > configured retry count? The policy is retry on commit errors during 
> > rebalance or shutdown, no need to retry on commit errors during 
> > auto-commits.
> >     
> >     So for e.g., if a mirror maker rebalances and there is simultaneously 
> > offset manager movement we would need to retry the commit.
> >     
> >     This is the motivation for the isAutoCommit flag - however, there seems 
> > to be a bug right now which maybe you can fix. i.e., if this is not an 
> > auto-commit then set retries to configured retries else no retries.

Changed the code based you your suggestion. My original thinking is that in 
mirror maker one commit failure actually does not matter too much because next 
commit will succeed if the failure is due to offset topic leader migration, 
etc. But for a more general purpose, it probably should retry if it is not an 
auto commit.


> On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 124
> > <https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line124>
> >
> >     This is a slightly misleading comment. i.e., we do allow a single 
> > message that may exceed this right? It is hard to document succinctly 
> > though.

I think the description seems clear enough for mirror maker. We do have 
comments in ByteBoundedBlockingQueue about the one more message exceeding the 
capacity.


> On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 192
> > <https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line192>
> >
> >     Why do you need a dummy param?

Because the Utils.createObjct needs a args parameter and if we pass in a null 
it will give an NPE...
I've changed the code in Utils to allow us to pass in a null which use the no 
arg constructor.


> On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 477
> > <https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line477>
> >
> >     Do we need this counter? i.e., there is already a commit meter in the 
> > consumer connector.

Right, maybe we don't need it.


> On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 489
> > <https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line489>
> >
> >     Why not use KafkaScheduler for the offset commit task?

Haven't thought that before... But it looks that we need to do some more 
handling when something wrong happen in the offset commit threads. The 
KafkaScheduler code seems not do so.


> On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 614
> > <https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line614>
> >
> >     There might be a small memory leak here: if there was an error though 
> > (in the branch above) it seems no one removes the offset from the list.

Yes, that is a memory leak, but in this case we should not commit the offset of 
the message that was not sent successfully either. If any exception occurs then 
the offsets will not advance anymore. We probably should have an alert on the 
mirror consumer lags.


> On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 658
> > <https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line658>
> >
> >     Note earlier comment on null vs None

As LinkedList seems to be a quite standard data structure, keeping it use null 
probably makes the code cleaner.


> On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 666
> > <https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line666>
> >
> >     Should we expose a size() method - i.e., increment on add and decrement 
> > on remove. We can aggregate the size of all the offset lists outside and 
> > emit a gauge. That will give us some assurance that there are no 
> > "forgotten" offsets. Re: the potential leak mentioned above.
> >     
> >     In fact, I'm a bit nervous about correctness since this is a custom 
> > implementation of a semi-non-trivial data structure. We should probably 
> > even assert that it is empty when numMessageUnacked goes to zero as part of 
> > the rebalance.
> >     
> >     Ideally, these custom implementations need a full-fledged unit test.

That's a good point, we probably need a metric to see if we have some stuck 
offsets. But those stuck offsets should also not be committed anyways. We need 
to be alerted on that situation once it happens. Maybe add an assertion on the 
exception block where stuck offset occurs will be better?


> On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 705
> > <https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line705>
> >
> >     You need to also set tail.next = null (or None)

tail.next = null will be handled in the previous if...else... the old 
tail.prev.next will become new tail.next, which will be null.


- Jiangjie


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review65306
-----------------------------------------------------------


On Dec. 17, 2014, 8:29 p.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/25995/
> -----------------------------------------------------------
> 
> (Updated Dec. 17, 2014, 8:29 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1650 and KAKFA-1650
>     https://issues.apache.org/jira/browse/KAFKA-1650
>     https://issues.apache.org/jira/browse/KAKFA-1650
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments
> 
> 
> commit before switch to trunk
> 
> 
> commit before rebase
> 
> 
> Rebased on trunk, Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments on MaxInFlightRequests
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
> 
> 
> Added consumer rebalance listener to mirror maker, will test it later.
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> Conflicts:
>       core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
>       
> core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
> 
> added custom config for consumer rebalance listener
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Add configurable consumer rebalance listener
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Addressed Guozhang's comment.
> 
> 
> numMessageUnacked should be decremented no matter the send was successful or 
> not.
> 
> 
> Addressed Jun's comments.
> 
> 
> Incorporated Jun's comments
> 
> 
> Incorporated Jun's comments and rebased on trunk
> 
> 
> Rebased on current trunk
> 
> 
> Addressed Joel's comments.
> 
> 
> Addressed Joel's comments.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/consumer/ConsumerConnector.scala 
> 62c0686e816d2888772d5a911becf625eedee397 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
> e991d2187d03241f639eeaf6769fb59c8c99664c 
>   core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
> 9baad34a9793e5067d11289ece2154ba87b388af 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala 
> 77d951d13b8d8ad00af40257fe51623cc2caa61a 
>   core/src/main/scala/kafka/utils/Utils.scala 
> 738c1af9ef5de16fdf5130daab69757a14c48b5c 
> 
> Diff: https://reviews.apache.org/r/25995/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>

Reply via email to