Hi Krishnan, 0.8.2.1 brokers also uncompress and recompress data. The ability to avoid recompression was introduced in 0.10.0.0 with a new message format (see KIP-31 and KIP-32). If you change log.message.format.version to 0.10.2, the broker will not uncompress and recompress messages produced by a producer with version 0.10.0.0 or higher. The downside is that consumers with version 0.9.0.x or older will no longer benefit from zero-copy transfers.
Ismael On Mon, Apr 17, 2017 at 1:35 AM, Krishnan Chandra <m...@krishnanchandra.com> wrote: > Hello, > > I have run into an issue trying to upgrade a Kafka cluster from 0.8.2.1 to > 0.10.2.0. The issue manifests itself in high GC time on the Kafka broker > after it starts accepting new requests. > > Upon further inspection and digging through stack traces, it appears that > the Kafka 10 broker is uncompressing and re-compressing all incoming > messages in the request handler threads. I've included a stack trace below. > > On the broker, I've set inter.broker.protocol.version to 0.8.2.1 and > log.message.format.version to 0.8.2.1 as well. All producers and consumers > in the system also follow the 0.8.2.1 protocol, and the producers use gzip > compression. Is there a setting I can change on the broker or producers > that will cause the broker to stop uncompressing and recompressing > messages? > > Thanks, > Krishnan > > Stack trace: > > "kafka-request-handler-4" daemon prio=5 tid=50 RUNNABLE > at java.util.zip.GZIPInputStream.readUByte(GZIPInputStream.java:266) > at java.util.zip.GZIPInputStream.readUShort(GZIPInputStream.java:258) > at java.util.zip.GZIPInputStream.readUInt(GZIPInputStream.java:250) > at java.util.zip.GZIPInputStream.readTrailer(GZIPInputStream.java:222) > Local Variable: java.io.SequenceInputStream#1 > at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:119) > Local Variable: java.util.zip.GZIPInputStream#1 > at java.io.DataInputStream.readFully(DataInputStream.java:195) > at org.apache.kafka.common.record.RecordsIterator$ > DataLogInputStream.nextEntry(RecordsIterator.java:110) > Local Variable: byte[]#2043 > at org.apache.kafka.common.record.RecordsIterator$ > DeepRecordsIterator.<init>(RecordsIterator.java:162) > Local Variable: org.apache.kafka.common.record.RecordsIterator$ > DataLogInputStream#1 > Local Variable: org.apache.kafka.common.record.Record#679 > Local Variable: org.apache.kafka.common.record.RecordsIterator$ > DeepRecordsIterator#1 > Local Variable: java.io.DataInputStream#1 > at org.apache.kafka.common.record.RecordsIterator. > makeNext(RecordsIterator.java:79) > at org.apache.kafka.common.record.RecordsIterator. > makeNext(RecordsIterator.java:34) > at org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext( > AbstractIterator.java:79) > Local Variable: org.apache.kafka.common.record.RecordsIterator#1 > at org.apache.kafka.common.utils.AbstractIterator.hasNext( > AbstractIterator.java:45) > at scala.collection.convert.Wrappers$JIteratorWrapper. > hasNext(Wrappers.scala:42) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > Local Variable: kafka.log.LogValidator$$anonfun$ > validateMessagesAndAssignOffsetsCompressed$1#1 > Local Variable: scala.collection.convert. > Wrappers$JIteratorWrapper#1132 > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at kafka.log.LogValidator$.validateMessagesAndAssignOffse > tsCompressed(LogValidator.scala:167) > Local Variable: scala.runtime.BooleanRef#421 > Local Variable: scala.runtime.LongRef#1681 > Local Variable: scala.collection.mutable.ArrayBuffer#699 > at kafka.log.LogValidator$.validateMessagesAndAssignOffse > ts(LogValidator.scala:67) > at kafka.log.Log.liftedTree1$1(Log.scala:372) > at kafka.log.Log.append(Log.scala:371) > Local Variable: java.lang.Object#2410 > Local Variable: kafka.common.LongRef#1 > Local Variable: scala.runtime.ObjectRef#736 > Local Variable: kafka.log.LogAppendInfo#425 > Local Variable: kafka.log.Log#7 > at kafka.cluster.Partition$$anonfun$11.apply(Partition.scala:451) > Local Variable: kafka.cluster.Replica#8 > at kafka.cluster.Partition$$anonfun$11.apply(Partition.scala:439) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213) > Local Variable: java.util.concurrent.locks.ReentrantReadWriteLock$ > ReadLock#5 > at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:219) > at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:438) > Local Variable: kafka.cluster.Partition#5 > at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2. > apply(ReplicaManager.scala:389) > Local Variable: org.apache.kafka.common.record.MemoryRecords#50 > Local Variable: kafka.server.ReplicaManager$$ > anonfun$appendToLocalLog$2#3 > Local Variable: org.apache.kafka.common.TopicPartition#792 > at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2. > apply(ReplicaManager.scala:375) > at scala.collection.TraversableLike$$anonfun$map$ > 1.apply(TraversableLike.scala:234) > at scala.collection.TraversableLike$$anonfun$map$ > 1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > Local Variable: scala.collection.TraversableLike$$anonfun$map$1#72 > Local Variable: scala.collection.convert.Wrappers$JMapWrapperLike$$ > anon$2#70 > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map( > TraversableLike.scala:234) > Local Variable: scala.collection.mutable.MapBuilder#3 > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at kafka.server.ReplicaManager.appendToLocalLog( > ReplicaManager.scala:375) > at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:312) > Local Variable: kafka.server.KafkaApis$$ > anonfun$handleProducerRequest$1#3 > Local Variable: scala.collection.convert.Wrappers$JMapWrapper#502 > at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:427) > Local Variable: org.apache.kafka.common.requests.ProduceRequest#86 > at kafka.server.KafkaApis.handle(KafkaApis.scala:80) > Local Variable: kafka.network.RequestChannel$Request#314 > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:62) > Local Variable: kafka.server.KafkaRequestHandler#6 > Local Variable: scala.runtime.ObjectRef#737 > at java.lang.Thread.run(Thread.java:745) >