> On Dec. 6, 2014, 5:17 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, lines 529-530
> > <https://reviews.apache.org/r/25995/diff/10/?file=782732#file782732line529>
> >
> >     Does that imply there is always an offset? Is that always true?
> >     
> >     I don't quite follow how the logic works. Since the offset for each 
> > target partition is updated independantly, I am not sure why you can rely 
> > on checking that those offsets are consecutive. Also, does this logic work 
> > when there is partitioning key?
> >     
> >     It would be useful to add some comments to describe why a two-level 
> > offset map is needed.
> 
> Jiangjie Qin wrote:
>     The iterator only iterates over the key that exits in the map, i.e. 
> offset exists.
>     
>     It does seem confusing without explanation... Will it be clearer if the 
> following comments are added?
>     
>     Following is the offset commit logic:
>     We know that:
>     1. Messages from same source partition end up in the same data channel 
> queue in order and will be sent by the same producer.
>     2. Messages sent by the same producer could go to different target 
> partitions even if the messages are from the same source partition. 
>     3. The order is only guaranteed for messages sent to the same target 
> partition. That means a (SourceTopicPartition, TargetPartition) combination 
> is needed.
>     4. For each (SourceTopicParitition, TargetPartition), keeping track of a 
> single offset is sufficient, because if an offset is acked, all the offset 
> smaller than that offset going to the same target partition must have been 
> sent successfully (MaxInFlightRequest=1). That said, if we have multiple 
> producers, after sorting all the last acked offsets of target partitions 
> which are corresponding to the same source partition, we can commit the 
> offsets from the smallest until the acked offset is no longer consecutive. 
> (But we do need to set send retries to be infinite in producer config, 
> otherwise this won't work. I'll add it to the comments.)
>     
>     Based on above logic, we could use Map<(SourceTopicParitition, 
> TargetPartition), offset> to track the offset. But because the offset commit 
> is based on source topic partitions, it is easier to have a 
> Map<SourceTopicPartition, Map<TargetPatition, offset>> to find the offset to 
> commit. That's why there is a 2-level map.
>     
>     The logic above does not rely on any message key, so it works in general 
> for both keyed/non-keyed messages.
> 
> Jun Rao wrote:
>     The part that's not clear to me is "until the acked offset is no longer 
> consecutive". Suppose there are 2 target partitions. Offset 1,2,4,5 can be 
> routed to partition 1 and offset 3 can be routed to partition 2. When all 
> messages are sent, you will see partition 1 with offset 5 and partition 2 
> with offset 3. Since offset 3 and 5 are not consecutive, are you just going 
> to commit offset 3? If so, you may be stuck in offset 3 forever if no new 
> messages  are sent to partition 2.
>     
>     Also, does this work with a log configured with compact? Such a log may 
> not have consecutive offsets to begin with.

I see, you are right. In that case it won't work. How about the following 
algorithm:
1. We keep a Map<SourceTopicPartition, (Set<Offset>, MaxOffsetInSet)>
2. When a message is sent, producer put its offset into the map based on source 
topic partition, and update the MaxOffsetInSet if necessary
3. In the producer callback, remove the offset from the Set<offset> if the 
message was sent successfully
4. When offset commit thread comes, it gets the smallest offset in the 
Set<offset> and commit this one. If the offset is empty, it commits 
MaxOffsetInSet+1.

This algorithm should be able to handle compacted partitions as well.


- Jiangjie


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review64152
-----------------------------------------------------------


On Dec. 7, 2014, 2:59 a.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/25995/
> -----------------------------------------------------------
> 
> (Updated Dec. 7, 2014, 2:59 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1650 and KAKFA-1650
>     https://issues.apache.org/jira/browse/KAFKA-1650
>     https://issues.apache.org/jira/browse/KAKFA-1650
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments
> 
> 
> commit before switch to trunk
> 
> 
> commit before rebase
> 
> 
> Rebased on trunk, Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments on MaxInFlightRequests
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
> 
> 
> Added consumer rebalance listener to mirror maker, will test it later.
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> Conflicts:
>       core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
>       
> core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
> 
> added custom config for consumer rebalance listener
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Add configurable consumer rebalance listener
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Addressed Guozhang's comment.
> 
> 
> numMessageUnacked should be decremented no matter the send was successful or 
> not.
> 
> 
> Addressed Jun's comments.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/consumer/ConsumerConnector.scala 
> 62c0686e816d2888772d5a911becf625eedee397 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
> da29a8cb461099eb675161db2f11a9937424a5c6 
>   core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java 
> PRE-CREATION 
>   core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
> 9d5a47fb8e04d0055cce820afde7f73affc0a984 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala 
> f399105087588946987bbc84e3759935d9498b6a 
>   core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala 
> 6a85d7e494f6c88798133a17f6180b61029dff58 
>   
> core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 
> 8c4687b2c96fddab7c8454a5a8011c3bab0897a0 
> 
> Diff: https://reviews.apache.org/r/25995/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>

Reply via email to