> On Dec. 6, 2014, 5:17 p.m., Jun Rao wrote:
> > Thanks for the patch. Some comments below.

Thank you very much for the review.


> On Dec. 6, 2014, 5:17 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, lines 529-530
> > <https://reviews.apache.org/r/25995/diff/10/?file=782732#file782732line529>
> >
> >     Does that imply there is always an offset? Is that always true?
> >     
> >     I don't quite follow how the logic works. Since the offset for each 
> > target partition is updated independantly, I am not sure why you can rely 
> > on checking that those offsets are consecutive. Also, does this logic work 
> > when there is partitioning key?
> >     
> >     It would be useful to add some comments to describe why a two-level 
> > offset map is needed.

The iterator only iterates over the key that exits in the map, i.e. offset 
exists.

It does seem confusing without explanation... Will it be clearer if the 
following comments are added?

Following is the offset commit logic:
We know that:
1. Messages from same source partition end up in the same data channel queue in 
order and will be sent by the same producer.
2. Messages sent by the same producer could go to different target partitions 
even if the messages are from the same source partition. 
3. The order is only guaranteed for messages sent to the same target partition. 
That means a (SourceTopicPartition, TargetPartition) combination is needed.
4. For each (SourceTopicParitition, TargetPartition), keeping track of a single 
offset is sufficient, because if an offset is acked, all the offset smaller 
than that offset going to the same target partition must have been sent 
successfully (MaxInFlightRequest=1). That said, if we have multiple producers, 
after sorting all the last acked offsets of target partitions which are 
corresponding to the same source partition, we can commit the offsets from the 
smallest until the acked offset is no longer consecutive. (But we do need to 
set send retries to be infinite in producer config, otherwise this won't work. 
I'll add it to the comments.)

Based on above logic, we could use Map<(SourceTopicParitition, 
TargetPartition), offset> to track the offset. But because the offset commit is 
based on source topic partitions, it is easier to have a 
Map<SourceTopicPartition, Map<TargetPatition, offset>> to find the offset to 
commit. That's why there is a 2-level map.

The logic above does not rely on any message key, so it works in general for 
both keyed/non-keyed messages.


> On Dec. 6, 2014, 5:17 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, lines 558-559
> > <https://reviews.apache.org/r/25995/diff/10/?file=782732#file782732line558>
> >
> >     It's weird to extend from NewShinyProducer but not using its method. 
> > Perhaps it will be clearer if we just let MirrorMakerNewProducer implement 
> > MirrorMakerBaseProducer.
> >     
> >     Ditto for MirrorMakerOldProducer.

I was trying to avoid duplicate code seems not saving too much code. I'll get 
rid of the extention.


> On Dec. 6, 2014, 5:17 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, lines 614-618
> > <https://reviews.apache.org/r/25995/diff/10/?file=782732#file782732line614>
> >
> >     This kind of coordination doesn't quite work. Suppose that we set 
> > inRebalance to true and entered the while loop. However, just before we 
> > call inRelanace.wait(), the producer could have finished sending all data 
> > and called inRebalance.notify(). Then we will be stuck in 
> > inRebalance.wait() forever since we missed the notification.
> >     
> >     One way to do that is to create a lock that protects the read/write of 
> > numMessageUnacked. Then we use a condition created from the lock to do the 
> > coordination. This way, both the wait/notification and the update/check of 
> > numMessageUnacked are protected by the same lock.

Yes, it is clearer to just use a single lock. I'll change the code.

But the current code itself seems to work, please corret me if I'm wrong.
In this case we essentially guarantees that [enable notify, numMessagesUnacked 
> 0 then wait] is atomic to notify. So either notify is not enabled yet or it 
occurs after rebalance listener starts to wait. So the notify will not be 
missed.
If rebalance listener grabs the inRebalance lock first and is in while loop 
then it won't release the inRebalance lock until it enters wait. Because 
producer have to grab inRebalance lock before calling inReblance.notify(), the 
producer will not be able to call inRebalance.notify() until the offset commit 
thread starts to wait. 
There are some more complicated sequences when producers call notify mutilple 
times, but as long as 
1.[numMessageUnacked > 0 then wait] is atomic to notify, and
2.[numMessageUnacked = 0] happens before notify
notify should not be missed.


- Jiangjie


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review64152
-----------------------------------------------------------


On Dec. 7, 2014, 2:59 a.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/25995/
> -----------------------------------------------------------
> 
> (Updated Dec. 7, 2014, 2:59 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1650 and KAKFA-1650
>     https://issues.apache.org/jira/browse/KAFKA-1650
>     https://issues.apache.org/jira/browse/KAKFA-1650
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments
> 
> 
> commit before switch to trunk
> 
> 
> commit before rebase
> 
> 
> Rebased on trunk, Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments on MaxInFlightRequests
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
> 
> 
> Added consumer rebalance listener to mirror maker, will test it later.
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> Conflicts:
>       core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
>       
> core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
> 
> added custom config for consumer rebalance listener
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Add configurable consumer rebalance listener
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Addressed Guozhang's comment.
> 
> 
> numMessageUnacked should be decremented no matter the send was successful or 
> not.
> 
> 
> Addressed Jun's comments.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/consumer/ConsumerConnector.scala 
> 62c0686e816d2888772d5a911becf625eedee397 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
> da29a8cb461099eb675161db2f11a9937424a5c6 
>   core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java 
> PRE-CREATION 
>   core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
> 9d5a47fb8e04d0055cce820afde7f73affc0a984 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala 
> f399105087588946987bbc84e3759935d9498b6a 
>   core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala 
> 6a85d7e494f6c88798133a17f6180b61029dff58 
>   
> core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 
> 8c4687b2c96fddab7c8454a5a8011c3bab0897a0 
> 
> Diff: https://reviews.apache.org/r/25995/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>

Reply via email to