Guozhang,

Thank you for the reply, but could you be a little bit more detailed?

When will this new MM with exact mirroring be rolled out? I went to the
following URL to read up on Kafka-1650
https://issues.apache.org/jira/browse/KAFKA-1650
but that issue doesn't appear to be about mirroring, but rather about clean
shutdowns.

What exactly would I need to alter in the MM code? I've cloned the 8.2.0
Kafka source and read through the kafka.tools.MirrorMaker.scala class, and
cannot find a single mention of partition within its source. Looking at the
ProducerThread and ConsumerThread classes, I would expect them to be passed
some parameter that corresponded to the partition number of the message
they're producing/consuming. However the closest thing I see to anything
partition related is the "data.key" object.

Some more guidance on this would be extremely helpful as this exact
mirroring process seems like a common use case, and it's not clear how one
should go about doing this.


Thanks Guozhang,

Alex Melville

On Sun, Feb 22, 2015 at 3:53 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hi Alex,
>
> What I originally meant is that you probably need to manually modify the MM
> code in order to achieve your needs. However, MM has been improved a lot
> since last time we synced up, in the next major release the MM will support
> exact mirroring (details in KAFKA-1650) with some functional extensions.
> Basically you just need to ensure the source and destination cluster have
> the some partitions for each topic, which can be done via consumer
> rebalance callbacks.
>
> Guozhang
>
> On Fri, Feb 20, 2015 at 12:34 AM, Alex Melville <amelvi...@g.hmc.edu>
> wrote:
>
> > Neha and Guozhang,
> >
> >
> > This thread is several months old now but I'd like to follow up on it as
> I
> > have a couple more questions related to it.
> >
> >
> > 1. Guozhang, you suggested 3 steps I take to ensure each piece of data
> > remains on the same partition from source to target cluster. In
> particular
> > you suggest
> >
> >
> > *        2. When the consumer of the MM gets a message, put the message
> to
> > the*
> > *producer's queue based on its partition id; i.e. if the partition id is
> > n,*
> >
> >
> > *put to n's producer queue.*
> > *3. When producer sends the data, specify the partition id; so each
> > producer*        *will only send to a single partition.*
> >
> > Since the MM is just using the default producer/consumer with no custom
> > logic written by me, how am I supposed to find a particular message's
> > partition id, and once I do that how am I supposed to specify which
> > producer queue to put that message in?
> >
> >
> >
> > 2. What does specifying the number of producer's in the MM do? Does each
> > producer only push to one partition within a cluster?
> >
> >
> >
> >
> > 3. I wrote a SimpleProducer that publishes a record with a String key and
> > byte[] message to a kafka cluster. Originally the producer config file I
> > was passing to Mirrormaker had the following important parameters
> >
> >
> >
> > *producer.type=sync*
> >
> > *compression.codec=none*
> >
> > *serializer.class=kafka.serializer.DefaultEncoder*
> > *key.serializer.class=kafka.serializer.StringEncoder*
> > *partitioner.class=org.rubicon.hmc.SimplePartitioner*
> >
> >
> >
> > where SimplePartitioner just took the String key, hashed it, and then
> took
> > the resulting number mod the number of partitions and used that number as
> > the partition to publish to. However I kept getting an error when the
> > producer tried to publish to the target cluster which said something
> along
> > the lines of "[B cannot be cast to java.lang.String". I at last figured
> > this was because I had specified
> > key.serializer.class=kafka.serializer.StringEncoder in the producer, but
> > what it was receiving from the consumer thread was just a byte[] msg and
> > when it tried to use the StringEncoder an error was returned. Now I'm
> using
> > the default partitionar and default encoder, I'm not getting that
> > exception, and data is being copied from source to target cluster.
> However
> > I've lost the ability to partition data on a String key, because the MM
> > producer is just given a raw byte array. My question is, am I correct in
> > this reasoning behind the "[B cannot be cast to java.lang.String" error?
> >
> >
> >
> >
> >
> >
> > Thank you in advance Neha, Guozhang, and the rest of the Kafka community!
> >
> >
> > Alex Melville
> >
> > On Fri, Dec 5, 2014 at 5:30 PM, Neha Narkhede <n...@confluent.io> wrote:
> >
> > > Going back to your previous requirement of ensuring that the data in
> the
> > > target cluster is in the same order as the source cluster, all you need
> > is
> > > to specify a key with every record in your data. The mirror maker and
> its
> > > producer takes care of placing all the data for a particular key in the
> > > same partition on the target cluster. Effectively, all your data will
> be
> > in
> > > the same order (though there may be a few duplicates as I mentioned
> > > before).
> > >
> > > Hope that helps!
> > >
> > > On Fri, Dec 5, 2014 at 1:23 PM, Alex Melville <amelvi...@g.hmc.edu>
> > wrote:
> > >
> > > > Thank you for your replies Guozhang and Neha, though I have some
> > followup
> > > > questions.
> > > >
> > > > I wrote my own Java Consumer and Producer based off of the Kafka
> > Producer
> > > > API and High Level Consumer. Let's call them MyConsumer and
> MyProducer.
> > > > MyProducer uses a custom Partitioner class called SimplePartitioner.
> In
> > > the
> > > > producer.config file that I specify when I run the MirrorMaker from
> the
> > > > command line, there is a parameter "partitioner.class". I keep
> getting
> > > > "ClassDefNotFoundException exceptions, no matter if I put the
> absolute
> > > path
> > > > to my SimplePartitioner.class file, a relative path, or even when I
> add
> > > > SimplePartitioner.class to the $CLASSPATH variables created in the
> > > > kafka-run-class.sh script. Here is my output error:
> > > >
> > > > Exception in thread "main" java.lang.ClassNotFoundException:
> > > > SimplePartitioner.class
> > > > at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> > > > at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> > > > at java.security.AccessController.doPrivileged(Native Method)
> > > > at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> > > > at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> > > > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> > > > at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> > > > at java.lang.Class.forName0(Native Method)
> > > > at java.lang.Class.forName(Class.java:191)
> > > > at kafka.utils.Utils$.createObject(Utils.scala:438)
> > > > at kafka.producer.Producer.<init>(Producer.scala:60)
> > > > at kafka.tools.MirrorMaker$$anonfun$1.apply(MirrorMaker.scala:116)
> > > > at kafka.tools.MirrorMaker$$anonfun$1.apply(MirrorMaker.scala:106)
> > > > at
> > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> > > > at
> > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> > > > at scala.collection.immutable.Range.foreach(Range.scala:81)
> > > > at
> > scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
> > > > at scala.collection.immutable.Range.map(Range.scala:46)
> > > > at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:106)
> > > > at kafka.tools.MirrorMaker.main(MirrorMaker.scala)
> > > >
> > > > What is the correct value for the "partitioner.class" parameter in my
> > > > producer.properties config file?
> > > >
> > > >
> > > >
> > > >
> > > > Guozhang, in your reply to my original message you said "When the
> > > consumer
> > > > of the MM gets a message, put the message to the producer's
> queue...".
> > > This
> > > > seems to imply that I can specify my own custom Consumer and Producer
> > > when
> > > > I run the Mirrormaker. How can I do this? Or, if I'm understand
> > > incorrectly
> > > > and I have to use whichever default consumer/producer the Mirrormaker
> > > uses,
> > > > how can I get that consumer to learn which partition it's reading
> from,
> > > > pass that info to the producer, and then specify that partition ID
> when
> > > the
> > > > producer rights to the target cluster?
> > > >
> > > >
> > > > -Alex
> > > >
> > > > On Wed, Nov 26, 2014 at 10:33 AM, Guozhang Wang <wangg...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hello Alex,
> > > > >
> > > > > This can be done by doing some tweaks in the MM code (with the
> 0.8.2
> > > new
> > > > > producer).
> > > > >
> > > > > 1. Set-up your MM to have the total # of producers equal to the #.
> of
> > > > > partitions in source / target cluster.
> > > > >
> > > > > 2. When the consumer of the MM gets a message, put the message to
> the
> > > > > producer's queue based on its partition id; i.e. if the partition
> id
> > is
> > > > n,
> > > > > put to n's producer queue.
> > > > >
> > > > > 3. When producer sends the data, specify the partition id; so each
> > > > producer
> > > > > will only send to a single partition.
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Tue, Nov 25, 2014 at 8:19 PM, Alex Melville <
> amelvi...@g.hmc.edu>
> > > > > wrote:
> > > > >
> > > > > > Howdy friends,
> > > > > >
> > > > > >
> > > > > > I'd like to mirror the topics on several clusters to a central
> > > cluster,
> > > > > and
> > > > > > I'm looking at using the default Mirrormaker to do so. I've
> already
> > > > done
> > > > > > some basic testing on the Mirrormaker found here:
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=27846330
> > > > > >
> > > > > > and managed to successfully copy a topic's partitions on a source
> > > > cluster
> > > > > > to a topic on a target cluster. So I'm able to mirror correctly.
> > > > However
> > > > > > for my particular use case I need to ensure that when I copy a
> > > topic's
> > > > > > partitions from source cluster to target cluster, a partition
> > created
> > > > on
> > > > > > the target cluster contains data in the exact same order as the
> > data
> > > on
> > > > > the
> > > > > > corresponding partition on the source cluster.
> > > > > >
> > > > > > I'm thinking of writing a Simple Consumer so I can manually
> compare
> > > the
> > > > > > events in a source cluster's partition with the corresponding
> > > partition
> > > > > on
> > > > > > the target cluster, but I'm not 100% sure if I'll be able to
> verify
> > > my
> > > > > > guarantee if I do it this way. Can anyone here verify that
> > partitions
> > > > > > copied over to the target cluster by the default Mirrormaker are
> an
> > > > exact
> > > > > > copy of those on the source cluster?
> > > > > >
> > > > > >
> > > > > > Thanks in advance,
> > > > > >
> > > > > > Alex Melville
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > Thanks,
> > > Neha
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to