I'm using bin/kafka-mirror-maker.sh for the first time and I need to take
my "aws-cloudtrail" topic from a 0.8.2.1 single broker and mirror it to a
0.10.0.0 cluster. I am running mirror maker from a host in the 0.10.0.0
cluster.

My consumer.properties file:

$ cat consumer.properties
zookeeper.connect=10.60.68.98:2181/
zookeeper.connectiontimeout.ms=1000000
bootstrap.servers=10.60.68.98:9092
consumer.timeout.ms=-1
group.id=mirror-cloudtrail
shallow.iterator.enable=true
mirror.topics.whitelist=aws-cloudtrail
inter.broker.protocol.version=0.8.2

My producer.properties file:
$ cat producer.properties
#zookeeper.connect=10.60.68.116:2181/
bootstrap.servers=10.60.68.116:9092
#producer.type=async
#compression.codec=1
#serializer.class=kafka.serializer.DefaultEncoder
#max.message.size=10000000
#queue.time=1000
#queue.enqueueTimeout.ms=-1

Then when I kick off the mirror maker script I see a stream of errors,
indicating some kind of protocol mis-match:

$ KAFKA_JMX_OPTS="" JMX_PORT=7204 ./bin/kafka-mirror-maker.sh
--consumer.config consumer.properties --producer.config producer.properties
--whitelist aws-cloudtrail
[2016-09-26 22:16:29,312] WARN Property bootstrap.servers is not valid
(kafka.utils.VerifiableProperties)
[2016-09-26 22:16:29,313] WARN Property inter.broker.protocol.version is
not valid (kafka.utils.VerifiableProperties)
[2016-09-26 22:16:29,313] WARN Property mirror.topics.whitelist is not
valid (kafka.utils.VerifiableProperties)
[2016-09-26 22:16:29,313] WARN Property shallow.iterator.enable is not
valid (kafka.utils.VerifiableProperties)
[2016-09-26 22:16:29,314] WARN Property zookeeper.connectiontimeout.ms is
not valid (kafka.utils.VerifiableProperties)
[2016-09-26 22:16:30,770] WARN
[ConsumerFetcherThread-mirror-cloudtrail_46fc50035fd0-1474928189396-63f8456c-0-0],
Error in fetch kafka.consumer.ConsumerFetcherThread$FetchRequest@7add6a8
(kafka.consumer.ConsumerFetcherThread)
java.lang.IllegalArgumentException
        at java.nio.Buffer.limit(Buffer.java:275)
        at
kafka.api.FetchResponsePartitionData$.readFrom(FetchResponse.scala:38)
        at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:100)
        at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:98)
        at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.immutable.Range.foreach(Range.scala:160)
        at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at kafka.api.TopicData$.readFrom(FetchResponse.scala:98)
        at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:170)
        at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169)
        at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.immutable.Range.foreach(Range.scala:160)
        at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at
scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
        at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:169)
        at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:135)
        at
kafka.consumer.ConsumerFetcherThread.fetch(ConsumerFetcherThread.scala:108)
        at
kafka.consumer.ConsumerFetcherThread.fetch(ConsumerFetcherThread.scala:29)
        at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:107)
        at
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)

in a tight loop, I have to background the script with ctrl-z and then run
"kill -9 %1" to kill off the job.

Do I need to set the "inter.broker.protocol.version" some other way?
Perhaps in the config of the 0.10.0.0 cluster nodes and then perform a
restart there?

Thanks,
Xavier

Reply via email to