Hi Ismael,

Thanks for the info. If 0.8.2.1 was also uncompressing and recompressing
data, do you have any insight as to what would cause higher memory usage on
0.10?

On 0.8, we are able to run our brokers with 1GB heap and they work
perfectly fine. On 0.10 we are seeing OutOfMemory errors even with a 16GB
heap, and there are a very large number of humongous allocations due to
GZIP buffers. I find it hard to believe that 0.10 uses 16x more memory than
0.8.

If we changed the log.message.format.version to 0.10.2, would that change
help with this issue? Upgrading the producers/consumers is not easy for us
at this time (they are all on 0.8.2).

Thanks,
Krishnan

On Mon, Apr 17, 2017 at 3:56 PM, Ismael Juma <ism...@juma.me.uk> wrote:

> Hi Krishnan,
>
> 0.8.2.1 brokers also uncompress and recompress data. The ability to avoid
> recompression was introduced in 0.10.0.0
> with a new message format (see KIP-31 and KIP-32). If you change
> log.message.format.version
> to 0.10.2, the broker will not uncompress and recompress messages produced
> by a producer with version 0.10.0.0 or higher. The downside is that
> consumers with version 0.9.0.x or older will no longer benefit from
> zero-copy transfers.
>
> Ismael
>
> On Mon, Apr 17, 2017 at 1:35 AM, Krishnan Chandra <m...@krishnanchandra.com>
> wrote:
>
> > Hello,
> >
> > I have run into an issue trying to upgrade a Kafka cluster from 0.8.2.1
> to
> > 0.10.2.0. The issue manifests itself in high GC time on the Kafka broker
> > after it starts accepting new requests.
> >
> > Upon further inspection and digging through stack traces, it appears that
> > the Kafka 10 broker is uncompressing and re-compressing all incoming
> > messages in the request handler threads. I've included a stack trace
> below.
> >
> > On the broker, I've set inter.broker.protocol.version to 0.8.2.1 and
> > log.message.format.version to 0.8.2.1 as well. All producers and
> consumers
> > in the system also follow the 0.8.2.1 protocol, and the producers use
> gzip
> > compression. Is there a setting I can change on the broker or producers
> > that will cause the broker to stop uncompressing and recompressing
> > messages?
> >
> > Thanks,
> > Krishnan
> >
> > Stack trace:
> >
> > "kafka-request-handler-4" daemon prio=5 tid=50 RUNNABLE
> >     at java.util.zip.GZIPInputStream.readUByte(GZIPInputStream.java:266)
> >     at java.util.zip.GZIPInputStream.readUShort(GZIPInputStream.
> java:258)
> >     at java.util.zip.GZIPInputStream.readUInt(GZIPInputStream.java:250)
> >     at java.util.zip.GZIPInputStream.readTrailer(GZIPInputStream.
> java:222)
> >        Local Variable: java.io.SequenceInputStream#1
> >     at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:119)
> >        Local Variable: java.util.zip.GZIPInputStream#1
> >     at java.io.DataInputStream.readFully(DataInputStream.java:195)
> >     at org.apache.kafka.common.record.RecordsIterator$
> > DataLogInputStream.nextEntry(RecordsIterator.java:110)
> >        Local Variable: byte[]#2043
> >     at org.apache.kafka.common.record.RecordsIterator$
> > DeepRecordsIterator.<init>(RecordsIterator.java:162)
> >        Local Variable: org.apache.kafka.common.record.RecordsIterator$
> > DataLogInputStream#1
> >        Local Variable: org.apache.kafka.common.record.Record#679
> >        Local Variable: org.apache.kafka.common.record.RecordsIterator$
> > DeepRecordsIterator#1
> >        Local Variable: java.io.DataInputStream#1
> >     at org.apache.kafka.common.record.RecordsIterator.
> > makeNext(RecordsIterator.java:79)
> >     at org.apache.kafka.common.record.RecordsIterator.
> > makeNext(RecordsIterator.java:34)
> >     at org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(
> > AbstractIterator.java:79)
> >        Local Variable: org.apache.kafka.common.record.RecordsIterator#1
> >     at org.apache.kafka.common.utils.AbstractIterator.hasNext(
> > AbstractIterator.java:45)
> >     at scala.collection.convert.Wrappers$JIteratorWrapper.
> > hasNext(Wrappers.scala:42)
> >     at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> >        Local Variable: kafka.log.LogValidator$$anonfun$
> > validateMessagesAndAssignOffsetsCompressed$1#1
> >        Local Variable: scala.collection.convert.
> > Wrappers$JIteratorWrapper#1132
> >     at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> >     at scala.collection.IterableLike$class.foreach(IterableLike.
> scala:72)
> >     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> >     at kafka.log.LogValidator$.validateMessagesAndAssignOffse
> > tsCompressed(LogValidator.scala:167)
> >        Local Variable: scala.runtime.BooleanRef#421
> >        Local Variable: scala.runtime.LongRef#1681
> >        Local Variable: scala.collection.mutable.ArrayBuffer#699
> >     at kafka.log.LogValidator$.validateMessagesAndAssignOffse
> > ts(LogValidator.scala:67)
> >     at kafka.log.Log.liftedTree1$1(Log.scala:372)
> >     at kafka.log.Log.append(Log.scala:371)
> >        Local Variable: java.lang.Object#2410
> >        Local Variable: kafka.common.LongRef#1
> >        Local Variable: scala.runtime.ObjectRef#736
> >        Local Variable: kafka.log.LogAppendInfo#425
> >        Local Variable: kafka.log.Log#7
> >     at kafka.cluster.Partition$$anonfun$11.apply(Partition.scala:451)
> >        Local Variable: kafka.cluster.Replica#8
> >     at kafka.cluster.Partition$$anonfun$11.apply(Partition.scala:439)
> >     at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
> >        Local Variable: java.util.concurrent.locks.
> ReentrantReadWriteLock$
> > ReadLock#5
> >     at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:219)
> >     at kafka.cluster.Partition.appendRecordsToLeader(
> Partition.scala:438)
> >        Local Variable: kafka.cluster.Partition#5
> >     at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.
> > apply(ReplicaManager.scala:389)
> >        Local Variable: org.apache.kafka.common.record.MemoryRecords#50
> >        Local Variable: kafka.server.ReplicaManager$$
> > anonfun$appendToLocalLog$2#3
> >        Local Variable: org.apache.kafka.common.TopicPartition#792
> >     at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.
> > apply(ReplicaManager.scala:375)
> >     at scala.collection.TraversableLike$$anonfun$map$
> > 1.apply(TraversableLike.scala:234)
> >     at scala.collection.TraversableLike$$anonfun$map$
> > 1.apply(TraversableLike.scala:234)
> >     at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> >        Local Variable: scala.collection.TraversableLike$$anonfun$map$
> 1#72
> >        Local Variable: scala.collection.convert.
> Wrappers$JMapWrapperLike$$
> > anon$2#70
> >     at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> >     at scala.collection.IterableLike$class.foreach(IterableLike.
> scala:72)
> >     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> >     at scala.collection.TraversableLike$class.map(
> > TraversableLike.scala:234)
> >        Local Variable: scala.collection.mutable.MapBuilder#3
> >     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> >     at kafka.server.ReplicaManager.appendToLocalLog(
> > ReplicaManager.scala:375)
> >     at kafka.server.ReplicaManager.appendRecords(ReplicaManager.
> scala:312)
> >        Local Variable: kafka.server.KafkaApis$$
> > anonfun$handleProducerRequest$1#3
> >        Local Variable: scala.collection.convert.Wrappers$JMapWrapper#502
> >     at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:427)
> >        Local Variable: org.apache.kafka.common.
> requests.ProduceRequest#86
> >     at kafka.server.KafkaApis.handle(KafkaApis.scala:80)
> >        Local Variable: kafka.network.RequestChannel$Request#314
> >     at kafka.server.KafkaRequestHandler.run(
> KafkaRequestHandler.scala:62)
> >        Local Variable: kafka.server.KafkaRequestHandler#6
> >        Local Variable: scala.runtime.ObjectRef#737
> >     at java.lang.Thread.run(Thread.java:745)
> >
>

Reply via email to