Hello,

I have run into an issue trying to upgrade a Kafka cluster from 0.8.2.1 to
0.10.2.0. The issue manifests itself in high GC time on the Kafka broker
after it starts accepting new requests.

Upon further inspection and digging through stack traces, it appears that
the Kafka 10 broker is uncompressing and re-compressing all incoming
messages in the request handler threads. I've included a stack trace below.

On the broker, I've set inter.broker.protocol.version to 0.8.2.1 and
log.message.format.version to 0.8.2.1 as well. All producers and consumers
in the system also follow the 0.8.2.1 protocol, and the producers use gzip
compression. Is there a setting I can change on the broker or producers
that will cause the broker to stop uncompressing and recompressing messages?

Thanks,
Krishnan

Stack trace:

"kafka-request-handler-4" daemon prio=5 tid=50 RUNNABLE
    at java.util.zip.GZIPInputStream.readUByte(GZIPInputStream.java:266)
    at java.util.zip.GZIPInputStream.readUShort(GZIPInputStream.java:258)
    at java.util.zip.GZIPInputStream.readUInt(GZIPInputStream.java:250)
    at java.util.zip.GZIPInputStream.readTrailer(GZIPInputStream.java:222)
       Local Variable: java.io.SequenceInputStream#1
    at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:119)
       Local Variable: java.util.zip.GZIPInputStream#1
    at java.io.DataInputStream.readFully(DataInputStream.java:195)
    at org.apache.kafka.common.record.RecordsIterator$
DataLogInputStream.nextEntry(RecordsIterator.java:110)
       Local Variable: byte[]#2043
    at org.apache.kafka.common.record.RecordsIterator$
DeepRecordsIterator.<init>(RecordsIterator.java:162)
       Local Variable: org.apache.kafka.common.record.RecordsIterator$
DataLogInputStream#1
       Local Variable: org.apache.kafka.common.record.Record#679
       Local Variable: org.apache.kafka.common.record.RecordsIterator$
DeepRecordsIterator#1
       Local Variable: java.io.DataInputStream#1
    at org.apache.kafka.common.record.RecordsIterator.
makeNext(RecordsIterator.java:79)
    at org.apache.kafka.common.record.RecordsIterator.
makeNext(RecordsIterator.java:34)
    at org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(
AbstractIterator.java:79)
       Local Variable: org.apache.kafka.common.record.RecordsIterator#1
    at org.apache.kafka.common.utils.AbstractIterator.hasNext(
AbstractIterator.java:45)
    at scala.collection.convert.Wrappers$JIteratorWrapper.
hasNext(Wrappers.scala:42)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
       Local Variable: kafka.log.LogValidator$$anonfun$
validateMessagesAndAssignOffsetsCompressed$1#1
       Local Variable: scala.collection.convert.
Wrappers$JIteratorWrapper#1132
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at kafka.log.LogValidator$.validateMessagesAndAssignOffse
tsCompressed(LogValidator.scala:167)
       Local Variable: scala.runtime.BooleanRef#421
       Local Variable: scala.runtime.LongRef#1681
       Local Variable: scala.collection.mutable.ArrayBuffer#699
    at kafka.log.LogValidator$.validateMessagesAndAssignOffse
ts(LogValidator.scala:67)
    at kafka.log.Log.liftedTree1$1(Log.scala:372)
    at kafka.log.Log.append(Log.scala:371)
       Local Variable: java.lang.Object#2410
       Local Variable: kafka.common.LongRef#1
       Local Variable: scala.runtime.ObjectRef#736
       Local Variable: kafka.log.LogAppendInfo#425
       Local Variable: kafka.log.Log#7
    at kafka.cluster.Partition$$anonfun$11.apply(Partition.scala:451)
       Local Variable: kafka.cluster.Replica#8
    at kafka.cluster.Partition$$anonfun$11.apply(Partition.scala:439)
    at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
       Local Variable: java.util.concurrent.locks.ReentrantReadWriteLock$
ReadLock#5
    at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:219)
    at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:438)
       Local Variable: kafka.cluster.Partition#5
    at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.
apply(ReplicaManager.scala:389)
       Local Variable: org.apache.kafka.common.record.MemoryRecords#50
       Local Variable: kafka.server.ReplicaManager$$
anonfun$appendToLocalLog$2#3
       Local Variable: org.apache.kafka.common.TopicPartition#792
    at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.
apply(ReplicaManager.scala:375)
    at scala.collection.TraversableLike$$anonfun$map$
1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$
1.apply(TraversableLike.scala:234)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
       Local Variable: scala.collection.TraversableLike$$anonfun$map$1#72
       Local Variable: scala.collection.convert.Wrappers$JMapWrapperLike$$
anon$2#70
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
       Local Variable: scala.collection.mutable.MapBuilder#3
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at kafka.server.ReplicaManager.appendToLocalLog(
ReplicaManager.scala:375)
    at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:312)
       Local Variable: kafka.server.KafkaApis$$
anonfun$handleProducerRequest$1#3
       Local Variable: scala.collection.convert.Wrappers$JMapWrapper#502
    at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:427)
       Local Variable: org.apache.kafka.common.requests.ProduceRequest#86
    at kafka.server.KafkaApis.handle(KafkaApis.scala:80)
       Local Variable: kafka.network.RequestChannel$Request#314
    at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:62)
       Local Variable: kafka.server.KafkaRequestHandler#6
       Local Variable: scala.runtime.ObjectRef#737
    at java.lang.Thread.run(Thread.java:745)

Reply via email to