> On Dec. 6, 2014, 5:17 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/tools/MirrorMaker.scala, lines 529-530 > > <https://reviews.apache.org/r/25995/diff/10/?file=782732#file782732line529> > > > > Does that imply there is always an offset? Is that always true? > > > > I don't quite follow how the logic works. Since the offset for each > > target partition is updated independantly, I am not sure why you can rely > > on checking that those offsets are consecutive. Also, does this logic work > > when there is partitioning key? > > > > It would be useful to add some comments to describe why a two-level > > offset map is needed. > > Jiangjie Qin wrote: > The iterator only iterates over the key that exits in the map, i.e. > offset exists. > > It does seem confusing without explanation... Will it be clearer if the > following comments are added? > > Following is the offset commit logic: > We know that: > 1. Messages from same source partition end up in the same data channel > queue in order and will be sent by the same producer. > 2. Messages sent by the same producer could go to different target > partitions even if the messages are from the same source partition. > 3. The order is only guaranteed for messages sent to the same target > partition. That means a (SourceTopicPartition, TargetPartition) combination > is needed. > 4. For each (SourceTopicParitition, TargetPartition), keeping track of a > single offset is sufficient, because if an offset is acked, all the offset > smaller than that offset going to the same target partition must have been > sent successfully (MaxInFlightRequest=1). That said, if we have multiple > producers, after sorting all the last acked offsets of target partitions > which are corresponding to the same source partition, we can commit the > offsets from the smallest until the acked offset is no longer consecutive. > (But we do need to set send retries to be infinite in producer config, > otherwise this won't work. I'll add it to the comments.) > > Based on above logic, we could use Map<(SourceTopicParitition, > TargetPartition), offset> to track the offset. But because the offset commit > is based on source topic partitions, it is easier to have a > Map<SourceTopicPartition, Map<TargetPatition, offset>> to find the offset to > commit. That's why there is a 2-level map. > > The logic above does not rely on any message key, so it works in general > for both keyed/non-keyed messages. > > Jun Rao wrote: > The part that's not clear to me is "until the acked offset is no longer > consecutive". Suppose there are 2 target partitions. Offset 1,2,4,5 can be > routed to partition 1 and offset 3 can be routed to partition 2. When all > messages are sent, you will see partition 1 with offset 5 and partition 2 > with offset 3. Since offset 3 and 5 are not consecutive, are you just going > to commit offset 3? If so, you may be stuck in offset 3 forever if no new > messages are sent to partition 2. > > Also, does this work with a log configured with compact? Such a log may > not have consecutive offsets to begin with. > > Jiangjie Qin wrote: > I see, you are right. In that case it won't work. How about the following > algorithm: > 1. We keep a Map<SourceTopicPartition, (Set<Offset>, MaxOffsetInSet)> > 2. When a message is sent, producer put its offset into the map based on > source topic partition, and update the MaxOffsetInSet if necessary > 3. In the producer callback, remove the offset from the Set<offset> if > the message was sent successfully > 4. When offset commit thread comes, it gets the smallest offset in the > Set<offset> and commit this one. If the offset is empty, it commits > MaxOffsetInSet+1. > > This algorithm should be able to handle compacted partitions as well. > > Jun Rao wrote: > Yes, but getting the smallest offset from a set can be expensive. I was > thinking that we can put the offset of all outstanding messages in a linked > list in source offset order. Every produced message will be taken out of the > linked list. We also maintain a maxOffsetSeen. The commit offset will be the > offset in the first link if the linked list is not empty. Otherwise, it will > be maxOffsetSeen.
Yes, this solution should have no bottleneck. I'll write a raw linked list. I was thinking that offset commit is infrequent and hopefuly we do not have many pending messages for a partition at a given point. - Jiangjie ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review64152 ----------------------------------------------------------- On Dec. 7, 2014, 2:59 a.m., Jiangjie Qin wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/25995/ > ----------------------------------------------------------- > > (Updated Dec. 7, 2014, 2:59 a.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1650 and KAKFA-1650 > https://issues.apache.org/jira/browse/KAFKA-1650 > https://issues.apache.org/jira/browse/KAKFA-1650 > > > Repository: kafka > > > Description > ------- > > Addressed Guozhang's comments. > > > Addressed Guozhang's comments > > > commit before switch to trunk > > > commit before rebase > > > Rebased on trunk, Addressed Guozhang's comments. > > > Addressed Guozhang's comments on MaxInFlightRequests > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > > Incorporated Guozhang's comments > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > > Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. > > > Added consumer rebalance listener to mirror maker, will test it later. > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > Conflicts: > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala > > core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala > > added custom config for consumer rebalance listener > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > > Add configurable consumer rebalance listener > > > Incorporated Guozhang's comments > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > > Incorporated Guozhang's comments. > > > Addressed Guozhang's comment. > > > numMessageUnacked should be decremented no matter the send was successful or > not. > > > Addressed Jun's comments. > > > Diffs > ----- > > core/src/main/scala/kafka/consumer/ConsumerConnector.scala > 62c0686e816d2888772d5a911becf625eedee397 > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala > da29a8cb461099eb675161db2f11a9937424a5c6 > core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java > PRE-CREATION > core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala > 9d5a47fb8e04d0055cce820afde7f73affc0a984 > core/src/main/scala/kafka/tools/MirrorMaker.scala > f399105087588946987bbc84e3759935d9498b6a > core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala > 6a85d7e494f6c88798133a17f6180b61029dff58 > > core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala > 8c4687b2c96fddab7c8454a5a8011c3bab0897a0 > > Diff: https://reviews.apache.org/r/25995/diff/ > > > Testing > ------- > > > Thanks, > > Jiangjie Qin > >