How many total #topics/partitions are you mirroring? If that number is large, you will need to increase the # of retries and backoff time in the consumer in MirrorMaker.
Thanks, Jun On Mon, Jun 10, 2013 at 8:21 AM, Yu, Libo <libo...@citi.com> wrote: > Hi, > > I come across several critical issues when benchmarking mirrormakers. > > 1 If a topic has N partitions on source cluster, after mirroring, on the > destination > cluster it has only one partition. > > 2 To solve the issue in 1), a topic is created on source and destination > at the same > time with the same number of partitions. After the topic is created on > both clusters, > some data is published to source cluster under the created topic. After > publishing > is done, multiple (3 in this case) mirrormaker processes are brought up on > destination > cluster to download data from the source. But the following exception > occurs: > > [2013-06-10 10:38:14,955] INFO [libogroup_myhost-1370875086011-c2357858], > Committing all offsets after clearing the fetcher queues > (kafka.consumer.ZookeeperConsumerConnector) > Exception in thread "main" kafka.common.ConsumerRebalanceFailedException: > libogroup_ myhost--1370875086011-c2357858 can't rebalance after 4 retries > at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:397) > at > kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:680) > at > kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:714) > at > kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:140) > at kafka.tools.MirrorMaker$$anonfun$4.apply(MirrorMaker.scala:118) > at kafka.tools.MirrorMaker$$anonfun$4.apply(MirrorMaker.scala:118) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > at > scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) > at scala.collection.immutable.List.foreach(List.scala:45) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:206) > at scala.collection.immutable.List.map(List.scala:45) > at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:118) > at kafka.tools.MirrorMaker.main(MirrorMaker.scala) > > 3) A topic is only created on source cluster. Some data is published to > the source > cluster under the topic. Now launch multiple (3 in this case) consumer > processes > at the same time, each of which has three stream. The exception also > occurs: > > Exception in thread "main" [2013-06-10 10:48:28,673] INFO [libogroup_ > myhost -1370875697107-38deafbc], begin rebalancing consumer > libogroup_myhost-1370875697107-38deafbc try #0 > (kafka.consumer.ZookeeperConsumerConnector) > kafka.common.ConsumerRebalanceFailedException: libogroup_ myhost > -1370875697107-38deafbc can't rebalance after 4 retries > at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:397) > at > kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:680) > at > kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:714) > at > kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:140) > at kafka.tools.MirrorMaker$$anonfun$4.apply(MirrorMaker.scala:118) > at kafka.tools.MirrorMaker$$anonfun$4.apply(MirrorMaker.scala:118) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > at > scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) > at scala.collection.immutable.List.foreach(List.scala:45) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:206) > at scala.collection.immutable.List.map(List.scala:45) > at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:118) > at kafka.tools.MirrorMaker.main(MirrorMaker.scala) > > > > Regards, > > Libo > >