Hello, I have run into an issue trying to upgrade a Kafka cluster from 0.8.2.1 to 0.10.2.0. The issue manifests itself in high GC time on the Kafka broker after it starts accepting new requests.
Upon further inspection and digging through stack traces, it appears that the Kafka 10 broker is uncompressing and re-compressing all incoming messages in the request handler threads. I've included a stack trace below. On the broker, I've set inter.broker.protocol.version to 0.8.2.1 and log.message.format.version to 0.8.2.1 as well. All producers and consumers in the system also follow the 0.8.2.1 protocol, and the producers use gzip compression. Is there a setting I can change on the broker or producers that will cause the broker to stop uncompressing and recompressing messages? Thanks, Krishnan Stack trace: "kafka-request-handler-4" daemon prio=5 tid=50 RUNNABLE at java.util.zip.GZIPInputStream.readUByte(GZIPInputStream.java:266) at java.util.zip.GZIPInputStream.readUShort(GZIPInputStream.java:258) at java.util.zip.GZIPInputStream.readUInt(GZIPInputStream.java:250) at java.util.zip.GZIPInputStream.readTrailer(GZIPInputStream.java:222) Local Variable: java.io.SequenceInputStream#1 at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:119) Local Variable: java.util.zip.GZIPInputStream#1 at java.io.DataInputStream.readFully(DataInputStream.java:195) at org.apache.kafka.common.record.RecordsIterator$ DataLogInputStream.nextEntry(RecordsIterator.java:110) Local Variable: byte[]#2043 at org.apache.kafka.common.record.RecordsIterator$ DeepRecordsIterator.<init>(RecordsIterator.java:162) Local Variable: org.apache.kafka.common.record.RecordsIterator$ DataLogInputStream#1 Local Variable: org.apache.kafka.common.record.Record#679 Local Variable: org.apache.kafka.common.record.RecordsIterator$ DeepRecordsIterator#1 Local Variable: java.io.DataInputStream#1 at org.apache.kafka.common.record.RecordsIterator. makeNext(RecordsIterator.java:79) at org.apache.kafka.common.record.RecordsIterator. makeNext(RecordsIterator.java:34) at org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext( AbstractIterator.java:79) Local Variable: org.apache.kafka.common.record.RecordsIterator#1 at org.apache.kafka.common.utils.AbstractIterator.hasNext( AbstractIterator.java:45) at scala.collection.convert.Wrappers$JIteratorWrapper. hasNext(Wrappers.scala:42) at scala.collection.Iterator$class.foreach(Iterator.scala:893) Local Variable: kafka.log.LogValidator$$anonfun$ validateMessagesAndAssignOffsetsCompressed$1#1 Local Variable: scala.collection.convert. Wrappers$JIteratorWrapper#1132 at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.log.LogValidator$.validateMessagesAndAssignOffse tsCompressed(LogValidator.scala:167) Local Variable: scala.runtime.BooleanRef#421 Local Variable: scala.runtime.LongRef#1681 Local Variable: scala.collection.mutable.ArrayBuffer#699 at kafka.log.LogValidator$.validateMessagesAndAssignOffse ts(LogValidator.scala:67) at kafka.log.Log.liftedTree1$1(Log.scala:372) at kafka.log.Log.append(Log.scala:371) Local Variable: java.lang.Object#2410 Local Variable: kafka.common.LongRef#1 Local Variable: scala.runtime.ObjectRef#736 Local Variable: kafka.log.LogAppendInfo#425 Local Variable: kafka.log.Log#7 at kafka.cluster.Partition$$anonfun$11.apply(Partition.scala:451) Local Variable: kafka.cluster.Replica#8 at kafka.cluster.Partition$$anonfun$11.apply(Partition.scala:439) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213) Local Variable: java.util.concurrent.locks.ReentrantReadWriteLock$ ReadLock#5 at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:219) at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:438) Local Variable: kafka.cluster.Partition#5 at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2. apply(ReplicaManager.scala:389) Local Variable: org.apache.kafka.common.record.MemoryRecords#50 Local Variable: kafka.server.ReplicaManager$$ anonfun$appendToLocalLog$2#3 Local Variable: org.apache.kafka.common.TopicPartition#792 at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2. apply(ReplicaManager.scala:375) at scala.collection.TraversableLike$$anonfun$map$ 1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$ 1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:893) Local Variable: scala.collection.TraversableLike$$anonfun$map$1#72 Local Variable: scala.collection.convert.Wrappers$JMapWrapperLike$$ anon$2#70 at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) Local Variable: scala.collection.mutable.MapBuilder#3 at scala.collection.AbstractTraversable.map(Traversable.scala:104) at kafka.server.ReplicaManager.appendToLocalLog( ReplicaManager.scala:375) at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:312) Local Variable: kafka.server.KafkaApis$$ anonfun$handleProducerRequest$1#3 Local Variable: scala.collection.convert.Wrappers$JMapWrapper#502 at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:427) Local Variable: org.apache.kafka.common.requests.ProduceRequest#86 at kafka.server.KafkaApis.handle(KafkaApis.scala:80) Local Variable: kafka.network.RequestChannel$Request#314 at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:62) Local Variable: kafka.server.KafkaRequestHandler#6 Local Variable: scala.runtime.ObjectRef#737 at java.lang.Thread.run(Thread.java:745)