Neha and Guozhang,
This thread is several months old now but I'd like to follow up on it as I have a couple more questions related to it. 1. Guozhang, you suggested 3 steps I take to ensure each piece of data remains on the same partition from source to target cluster. In particular you suggest * 2. When the consumer of the MM gets a message, put the message to the* *producer's queue based on its partition id; i.e. if the partition id is n,* *put to n's producer queue.* *3. When producer sends the data, specify the partition id; so each producer* *will only send to a single partition.* Since the MM is just using the default producer/consumer with no custom logic written by me, how am I supposed to find a particular message's partition id, and once I do that how am I supposed to specify which producer queue to put that message in? 2. What does specifying the number of producer's in the MM do? Does each producer only push to one partition within a cluster? 3. I wrote a SimpleProducer that publishes a record with a String key and byte[] message to a kafka cluster. Originally the producer config file I was passing to Mirrormaker had the following important parameters *producer.type=sync* *compression.codec=none* *serializer.class=kafka.serializer.DefaultEncoder* *key.serializer.class=kafka.serializer.StringEncoder* *partitioner.class=org.rubicon.hmc.SimplePartitioner* where SimplePartitioner just took the String key, hashed it, and then took the resulting number mod the number of partitions and used that number as the partition to publish to. However I kept getting an error when the producer tried to publish to the target cluster which said something along the lines of "[B cannot be cast to java.lang.String". I at last figured this was because I had specified key.serializer.class=kafka.serializer.StringEncoder in the producer, but what it was receiving from the consumer thread was just a byte[] msg and when it tried to use the StringEncoder an error was returned. Now I'm using the default partitionar and default encoder, I'm not getting that exception, and data is being copied from source to target cluster. However I've lost the ability to partition data on a String key, because the MM producer is just given a raw byte array. My question is, am I correct in this reasoning behind the "[B cannot be cast to java.lang.String" error? Thank you in advance Neha, Guozhang, and the rest of the Kafka community! Alex Melville On Fri, Dec 5, 2014 at 5:30 PM, Neha Narkhede <n...@confluent.io> wrote: > Going back to your previous requirement of ensuring that the data in the > target cluster is in the same order as the source cluster, all you need is > to specify a key with every record in your data. The mirror maker and its > producer takes care of placing all the data for a particular key in the > same partition on the target cluster. Effectively, all your data will be in > the same order (though there may be a few duplicates as I mentioned > before). > > Hope that helps! > > On Fri, Dec 5, 2014 at 1:23 PM, Alex Melville <amelvi...@g.hmc.edu> wrote: > > > Thank you for your replies Guozhang and Neha, though I have some followup > > questions. > > > > I wrote my own Java Consumer and Producer based off of the Kafka Producer > > API and High Level Consumer. Let's call them MyConsumer and MyProducer. > > MyProducer uses a custom Partitioner class called SimplePartitioner. In > the > > producer.config file that I specify when I run the MirrorMaker from the > > command line, there is a parameter "partitioner.class". I keep getting > > "ClassDefNotFoundException exceptions, no matter if I put the absolute > path > > to my SimplePartitioner.class file, a relative path, or even when I add > > SimplePartitioner.class to the $CLASSPATH variables created in the > > kafka-run-class.sh script. Here is my output error: > > > > Exception in thread "main" java.lang.ClassNotFoundException: > > SimplePartitioner.class > > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > > at java.security.AccessController.doPrivileged(Native Method) > > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > > at java.lang.Class.forName0(Native Method) > > at java.lang.Class.forName(Class.java:191) > > at kafka.utils.Utils$.createObject(Utils.scala:438) > > at kafka.producer.Producer.<init>(Producer.scala:60) > > at kafka.tools.MirrorMaker$$anonfun$1.apply(MirrorMaker.scala:116) > > at kafka.tools.MirrorMaker$$anonfun$1.apply(MirrorMaker.scala:106) > > at > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233) > > at > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233) > > at scala.collection.immutable.Range.foreach(Range.scala:81) > > at scala.collection.TraversableLike$class.map(TraversableLike.scala:233) > > at scala.collection.immutable.Range.map(Range.scala:46) > > at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:106) > > at kafka.tools.MirrorMaker.main(MirrorMaker.scala) > > > > What is the correct value for the "partitioner.class" parameter in my > > producer.properties config file? > > > > > > > > > > Guozhang, in your reply to my original message you said "When the > consumer > > of the MM gets a message, put the message to the producer's queue...". > This > > seems to imply that I can specify my own custom Consumer and Producer > when > > I run the Mirrormaker. How can I do this? Or, if I'm understand > incorrectly > > and I have to use whichever default consumer/producer the Mirrormaker > uses, > > how can I get that consumer to learn which partition it's reading from, > > pass that info to the producer, and then specify that partition ID when > the > > producer rights to the target cluster? > > > > > > -Alex > > > > On Wed, Nov 26, 2014 at 10:33 AM, Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > Hello Alex, > > > > > > This can be done by doing some tweaks in the MM code (with the 0.8.2 > new > > > producer). > > > > > > 1. Set-up your MM to have the total # of producers equal to the #. of > > > partitions in source / target cluster. > > > > > > 2. When the consumer of the MM gets a message, put the message to the > > > producer's queue based on its partition id; i.e. if the partition id is > > n, > > > put to n's producer queue. > > > > > > 3. When producer sends the data, specify the partition id; so each > > producer > > > will only send to a single partition. > > > > > > Guozhang > > > > > > > > > On Tue, Nov 25, 2014 at 8:19 PM, Alex Melville <amelvi...@g.hmc.edu> > > > wrote: > > > > > > > Howdy friends, > > > > > > > > > > > > I'd like to mirror the topics on several clusters to a central > cluster, > > > and > > > > I'm looking at using the default Mirrormaker to do so. I've already > > done > > > > some basic testing on the Mirrormaker found here: > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=27846330 > > > > > > > > and managed to successfully copy a topic's partitions on a source > > cluster > > > > to a topic on a target cluster. So I'm able to mirror correctly. > > However > > > > for my particular use case I need to ensure that when I copy a > topic's > > > > partitions from source cluster to target cluster, a partition created > > on > > > > the target cluster contains data in the exact same order as the data > on > > > the > > > > corresponding partition on the source cluster. > > > > > > > > I'm thinking of writing a Simple Consumer so I can manually compare > the > > > > events in a source cluster's partition with the corresponding > partition > > > on > > > > the target cluster, but I'm not 100% sure if I'll be able to verify > my > > > > guarantee if I do it this way. Can anyone here verify that partitions > > > > copied over to the target cluster by the default Mirrormaker are an > > exact > > > > copy of those on the source cluster? > > > > > > > > > > > > Thanks in advance, > > > > > > > > Alex Melville > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > -- > Thanks, > Neha >