Hi Krishnan,

0.8.2.1 brokers also uncompress and recompress data. The ability to avoid
recompression was introduced in 0.10.0.0
with a new message format (see KIP-31 and KIP-32). If you change
log.message.format.version
to 0.10.2, the broker will not uncompress and recompress messages produced
by a producer with version 0.10.0.0 or higher. The downside is that
consumers with version 0.9.0.x or older will no longer benefit from
zero-copy transfers.

Ismael

On Mon, Apr 17, 2017 at 1:35 AM, Krishnan Chandra <m...@krishnanchandra.com>
wrote:

> Hello,
>
> I have run into an issue trying to upgrade a Kafka cluster from 0.8.2.1 to
> 0.10.2.0. The issue manifests itself in high GC time on the Kafka broker
> after it starts accepting new requests.
>
> Upon further inspection and digging through stack traces, it appears that
> the Kafka 10 broker is uncompressing and re-compressing all incoming
> messages in the request handler threads. I've included a stack trace below.
>
> On the broker, I've set inter.broker.protocol.version to 0.8.2.1 and
> log.message.format.version to 0.8.2.1 as well. All producers and consumers
> in the system also follow the 0.8.2.1 protocol, and the producers use gzip
> compression. Is there a setting I can change on the broker or producers
> that will cause the broker to stop uncompressing and recompressing
> messages?
>
> Thanks,
> Krishnan
>
> Stack trace:
>
> "kafka-request-handler-4" daemon prio=5 tid=50 RUNNABLE
>     at java.util.zip.GZIPInputStream.readUByte(GZIPInputStream.java:266)
>     at java.util.zip.GZIPInputStream.readUShort(GZIPInputStream.java:258)
>     at java.util.zip.GZIPInputStream.readUInt(GZIPInputStream.java:250)
>     at java.util.zip.GZIPInputStream.readTrailer(GZIPInputStream.java:222)
>        Local Variable: java.io.SequenceInputStream#1
>     at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:119)
>        Local Variable: java.util.zip.GZIPInputStream#1
>     at java.io.DataInputStream.readFully(DataInputStream.java:195)
>     at org.apache.kafka.common.record.RecordsIterator$
> DataLogInputStream.nextEntry(RecordsIterator.java:110)
>        Local Variable: byte[]#2043
>     at org.apache.kafka.common.record.RecordsIterator$
> DeepRecordsIterator.<init>(RecordsIterator.java:162)
>        Local Variable: org.apache.kafka.common.record.RecordsIterator$
> DataLogInputStream#1
>        Local Variable: org.apache.kafka.common.record.Record#679
>        Local Variable: org.apache.kafka.common.record.RecordsIterator$
> DeepRecordsIterator#1
>        Local Variable: java.io.DataInputStream#1
>     at org.apache.kafka.common.record.RecordsIterator.
> makeNext(RecordsIterator.java:79)
>     at org.apache.kafka.common.record.RecordsIterator.
> makeNext(RecordsIterator.java:34)
>     at org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(
> AbstractIterator.java:79)
>        Local Variable: org.apache.kafka.common.record.RecordsIterator#1
>     at org.apache.kafka.common.utils.AbstractIterator.hasNext(
> AbstractIterator.java:45)
>     at scala.collection.convert.Wrappers$JIteratorWrapper.
> hasNext(Wrappers.scala:42)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>        Local Variable: kafka.log.LogValidator$$anonfun$
> validateMessagesAndAssignOffsetsCompressed$1#1
>        Local Variable: scala.collection.convert.
> Wrappers$JIteratorWrapper#1132
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at kafka.log.LogValidator$.validateMessagesAndAssignOffse
> tsCompressed(LogValidator.scala:167)
>        Local Variable: scala.runtime.BooleanRef#421
>        Local Variable: scala.runtime.LongRef#1681
>        Local Variable: scala.collection.mutable.ArrayBuffer#699
>     at kafka.log.LogValidator$.validateMessagesAndAssignOffse
> ts(LogValidator.scala:67)
>     at kafka.log.Log.liftedTree1$1(Log.scala:372)
>     at kafka.log.Log.append(Log.scala:371)
>        Local Variable: java.lang.Object#2410
>        Local Variable: kafka.common.LongRef#1
>        Local Variable: scala.runtime.ObjectRef#736
>        Local Variable: kafka.log.LogAppendInfo#425
>        Local Variable: kafka.log.Log#7
>     at kafka.cluster.Partition$$anonfun$11.apply(Partition.scala:451)
>        Local Variable: kafka.cluster.Replica#8
>     at kafka.cluster.Partition$$anonfun$11.apply(Partition.scala:439)
>     at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
>        Local Variable: java.util.concurrent.locks.ReentrantReadWriteLock$
> ReadLock#5
>     at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:219)
>     at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:438)
>        Local Variable: kafka.cluster.Partition#5
>     at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.
> apply(ReplicaManager.scala:389)
>        Local Variable: org.apache.kafka.common.record.MemoryRecords#50
>        Local Variable: kafka.server.ReplicaManager$$
> anonfun$appendToLocalLog$2#3
>        Local Variable: org.apache.kafka.common.TopicPartition#792
>     at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.
> apply(ReplicaManager.scala:375)
>     at scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:234)
>     at scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:234)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>        Local Variable: scala.collection.TraversableLike$$anonfun$map$1#72
>        Local Variable: scala.collection.convert.Wrappers$JMapWrapperLike$$
> anon$2#70
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableLike$class.map(
> TraversableLike.scala:234)
>        Local Variable: scala.collection.mutable.MapBuilder#3
>     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>     at kafka.server.ReplicaManager.appendToLocalLog(
> ReplicaManager.scala:375)
>     at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:312)
>        Local Variable: kafka.server.KafkaApis$$
> anonfun$handleProducerRequest$1#3
>        Local Variable: scala.collection.convert.Wrappers$JMapWrapper#502
>     at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:427)
>        Local Variable: org.apache.kafka.common.requests.ProduceRequest#86
>     at kafka.server.KafkaApis.handle(KafkaApis.scala:80)
>        Local Variable: kafka.network.RequestChannel$Request#314
>     at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:62)
>        Local Variable: kafka.server.KafkaRequestHandler#6
>        Local Variable: scala.runtime.ObjectRef#737
>     at java.lang.Thread.run(Thread.java:745)
>

Reply via email to