How many total #topics/partitions are you mirroring? If that number is
large, you will need to increase the # of retries and backoff time in the
consumer in MirrorMaker.

Thanks,

Jun


On Mon, Jun 10, 2013 at 8:21 AM, Yu, Libo <libo...@citi.com> wrote:

> Hi,
>
> I come across several critical issues when benchmarking mirrormakers.
>
> 1 If a topic has N partitions on source cluster, after mirroring, on the
> destination
> cluster it has only one partition.
>
> 2 To solve the issue in 1), a topic is created on source and destination
> at the same
> time with the same number of partitions. After the topic is created on
> both clusters,
> some data is published to source cluster under the created topic. After
> publishing
> is done, multiple (3 in this case) mirrormaker processes are brought up on
> destination
> cluster to download data from the source. But the following exception
> occurs:
>
> [2013-06-10 10:38:14,955] INFO [libogroup_myhost-1370875086011-c2357858],
> Committing all offsets after clearing the fetcher queues
> (kafka.consumer.ZookeeperConsumerConnector)
> Exception in thread "main" kafka.common.ConsumerRebalanceFailedException:
> libogroup_ myhost--1370875086011-c2357858 can't rebalance after 4 retries
>         at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:397)
>         at
> kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:680)
>         at
> kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:714)
>         at
> kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:140)
>         at kafka.tools.MirrorMaker$$anonfun$4.apply(MirrorMaker.scala:118)
>         at kafka.tools.MirrorMaker$$anonfun$4.apply(MirrorMaker.scala:118)
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>         at
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
>         at scala.collection.immutable.List.foreach(List.scala:45)
>         at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
>         at scala.collection.immutable.List.map(List.scala:45)
>         at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:118)
>         at kafka.tools.MirrorMaker.main(MirrorMaker.scala)
>
> 3) A topic is only created on source cluster. Some data is published to
> the source
>     cluster under the topic. Now launch multiple (3 in this case) consumer
> processes
>     at the same time, each of which has three stream. The exception also
> occurs:
>
> Exception in thread "main" [2013-06-10 10:48:28,673] INFO [libogroup_
> myhost -1370875697107-38deafbc], begin rebalancing consumer
> libogroup_myhost-1370875697107-38deafbc try #0
> (kafka.consumer.ZookeeperConsumerConnector)
> kafka.common.ConsumerRebalanceFailedException: libogroup_ myhost
> -1370875697107-38deafbc can't rebalance after 4 retries
>         at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:397)
>         at
> kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:680)
>         at
> kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:714)
>         at
> kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:140)
>         at kafka.tools.MirrorMaker$$anonfun$4.apply(MirrorMaker.scala:118)
>         at kafka.tools.MirrorMaker$$anonfun$4.apply(MirrorMaker.scala:118)
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>         at
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
>         at scala.collection.immutable.List.foreach(List.scala:45)
>         at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
>         at scala.collection.immutable.List.map(List.scala:45)
>         at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:118)
>         at kafka.tools.MirrorMaker.main(MirrorMaker.scala)
>
>
>
> Regards,
>
> Libo
>
>

Reply via email to