Hi Krishnan,

In 0.10.2, there was some consolidation of the message format code and the
recompression code path is not as well optimised in the consolidated code
(recompression is supposed to be rare now). Not sure if this explains the
huge difference you are seeing though.

Changing log.message.format.version won't help if the producers are still
on version 0.8.2.

Ismael

On Tue, Apr 18, 2017 at 1:03 AM, Krishnan Chandra <m...@krishnanchandra.com>
wrote:

> Hi Ismael,
>
> Thanks for the info. If 0.8.2.1 was also uncompressing and recompressing
> data, do you have any insight as to what would cause higher memory usage on
> 0.10?
>
> On 0.8, we are able to run our brokers with 1GB heap and they work
> perfectly fine. On 0.10 we are seeing OutOfMemory errors even with a 16GB
> heap, and there are a very large number of humongous allocations due to
> GZIP buffers. I find it hard to believe that 0.10 uses 16x more memory than
> 0.8.
>
> If we changed the log.message.format.version to 0.10.2, would that change
> help with this issue? Upgrading the producers/consumers is not easy for us
> at this time (they are all on 0.8.2).
>
> Thanks,
> Krishnan
>
> On Mon, Apr 17, 2017 at 3:56 PM, Ismael Juma <ism...@juma.me.uk> wrote:
>
> > Hi Krishnan,
> >
> > 0.8.2.1 brokers also uncompress and recompress data. The ability to avoid
> > recompression was introduced in 0.10.0.0
> > with a new message format (see KIP-31 and KIP-32). If you change
> > log.message.format.version
> > to 0.10.2, the broker will not uncompress and recompress messages
> produced
> > by a producer with version 0.10.0.0 or higher. The downside is that
> > consumers with version 0.9.0.x or older will no longer benefit from
> > zero-copy transfers.
> >
> > Ismael
> >
> > On Mon, Apr 17, 2017 at 1:35 AM, Krishnan Chandra <
> m...@krishnanchandra.com>
> > wrote:
> >
> > > Hello,
> > >
> > > I have run into an issue trying to upgrade a Kafka cluster from 0.8.2.1
> > to
> > > 0.10.2.0. The issue manifests itself in high GC time on the Kafka
> broker
> > > after it starts accepting new requests.
> > >
> > > Upon further inspection and digging through stack traces, it appears
> that
> > > the Kafka 10 broker is uncompressing and re-compressing all incoming
> > > messages in the request handler threads. I've included a stack trace
> > below.
> > >
> > > On the broker, I've set inter.broker.protocol.version to 0.8.2.1 and
> > > log.message.format.version to 0.8.2.1 as well. All producers and
> > consumers
> > > in the system also follow the 0.8.2.1 protocol, and the producers use
> > gzip
> > > compression. Is there a setting I can change on the broker or producers
> > > that will cause the broker to stop uncompressing and recompressing
> > > messages?
> > >
> > > Thanks,
> > > Krishnan
> > >
> > > Stack trace:
> > >
> > > "kafka-request-handler-4" daemon prio=5 tid=50 RUNNABLE
> > >     at java.util.zip.GZIPInputStream.readUByte(GZIPInputStream.
> java:266)
> > >     at java.util.zip.GZIPInputStream.readUShort(GZIPInputStream.
> > java:258)
> > >     at java.util.zip.GZIPInputStream.readUInt(GZIPInputStream.java:
> 250)
> > >     at java.util.zip.GZIPInputStream.readTrailer(GZIPInputStream.
> > java:222)
> > >        Local Variable: java.io.SequenceInputStream#1
> > >     at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:119)
> > >        Local Variable: java.util.zip.GZIPInputStream#1
> > >     at java.io.DataInputStream.readFully(DataInputStream.java:195)
> > >     at org.apache.kafka.common.record.RecordsIterator$
> > > DataLogInputStream.nextEntry(RecordsIterator.java:110)
> > >        Local Variable: byte[]#2043
> > >     at org.apache.kafka.common.record.RecordsIterator$
> > > DeepRecordsIterator.<init>(RecordsIterator.java:162)
> > >        Local Variable: org.apache.kafka.common.record.RecordsIterator$
> > > DataLogInputStream#1
> > >        Local Variable: org.apache.kafka.common.record.Record#679
> > >        Local Variable: org.apache.kafka.common.record.RecordsIterator$
> > > DeepRecordsIterator#1
> > >        Local Variable: java.io.DataInputStream#1
> > >     at org.apache.kafka.common.record.RecordsIterator.
> > > makeNext(RecordsIterator.java:79)
> > >     at org.apache.kafka.common.record.RecordsIterator.
> > > makeNext(RecordsIterator.java:34)
> > >     at org.apache.kafka.common.utils.AbstractIterator.
> maybeComputeNext(
> > > AbstractIterator.java:79)
> > >        Local Variable: org.apache.kafka.common.
> record.RecordsIterator#1
> > >     at org.apache.kafka.common.utils.AbstractIterator.hasNext(
> > > AbstractIterator.java:45)
> > >     at scala.collection.convert.Wrappers$JIteratorWrapper.
> > > hasNext(Wrappers.scala:42)
> > >     at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> > >        Local Variable: kafka.log.LogValidator$$anonfun$
> > > validateMessagesAndAssignOffsetsCompressed$1#1
> > >        Local Variable: scala.collection.convert.
> > > Wrappers$JIteratorWrapper#1132
> > >     at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> > >     at scala.collection.IterableLike$class.foreach(IterableLike.
> > scala:72)
> > >     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> > >     at kafka.log.LogValidator$.validateMessagesAndAssignOffse
> > > tsCompressed(LogValidator.scala:167)
> > >        Local Variable: scala.runtime.BooleanRef#421
> > >        Local Variable: scala.runtime.LongRef#1681
> > >        Local Variable: scala.collection.mutable.ArrayBuffer#699
> > >     at kafka.log.LogValidator$.validateMessagesAndAssignOffse
> > > ts(LogValidator.scala:67)
> > >     at kafka.log.Log.liftedTree1$1(Log.scala:372)
> > >     at kafka.log.Log.append(Log.scala:371)
> > >        Local Variable: java.lang.Object#2410
> > >        Local Variable: kafka.common.LongRef#1
> > >        Local Variable: scala.runtime.ObjectRef#736
> > >        Local Variable: kafka.log.LogAppendInfo#425
> > >        Local Variable: kafka.log.Log#7
> > >     at kafka.cluster.Partition$$anonfun$11.apply(Partition.scala:451)
> > >        Local Variable: kafka.cluster.Replica#8
> > >     at kafka.cluster.Partition$$anonfun$11.apply(Partition.scala:439)
> > >     at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
> > >        Local Variable: java.util.concurrent.locks.
> > ReentrantReadWriteLock$
> > > ReadLock#5
> > >     at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:219)
> > >     at kafka.cluster.Partition.appendRecordsToLeader(
> > Partition.scala:438)
> > >        Local Variable: kafka.cluster.Partition#5
> > >     at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.
> > > apply(ReplicaManager.scala:389)
> > >        Local Variable: org.apache.kafka.common.record.MemoryRecords#50
> > >        Local Variable: kafka.server.ReplicaManager$$
> > > anonfun$appendToLocalLog$2#3
> > >        Local Variable: org.apache.kafka.common.TopicPartition#792
> > >     at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.
> > > apply(ReplicaManager.scala:375)
> > >     at scala.collection.TraversableLike$$anonfun$map$
> > > 1.apply(TraversableLike.scala:234)
> > >     at scala.collection.TraversableLike$$anonfun$map$
> > > 1.apply(TraversableLike.scala:234)
> > >     at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> > >        Local Variable: scala.collection.TraversableLike$$anonfun$map$
> > 1#72
> > >        Local Variable: scala.collection.convert.
> > Wrappers$JMapWrapperLike$$
> > > anon$2#70
> > >     at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> > >     at scala.collection.IterableLike$class.foreach(IterableLike.
> > scala:72)
> > >     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> > >     at scala.collection.TraversableLike$class.map(
> > > TraversableLike.scala:234)
> > >        Local Variable: scala.collection.mutable.MapBuilder#3
> > >     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> > >     at kafka.server.ReplicaManager.appendToLocalLog(
> > > ReplicaManager.scala:375)
> > >     at kafka.server.ReplicaManager.appendRecords(ReplicaManager.
> > scala:312)
> > >        Local Variable: kafka.server.KafkaApis$$
> > > anonfun$handleProducerRequest$1#3
> > >        Local Variable: scala.collection.convert.
> Wrappers$JMapWrapper#502
> > >     at kafka.server.KafkaApis.handleProducerRequest(
> KafkaApis.scala:427)
> > >        Local Variable: org.apache.kafka.common.
> > requests.ProduceRequest#86
> > >     at kafka.server.KafkaApis.handle(KafkaApis.scala:80)
> > >        Local Variable: kafka.network.RequestChannel$Request#314
> > >     at kafka.server.KafkaRequestHandler.run(
> > KafkaRequestHandler.scala:62)
> > >        Local Variable: kafka.server.KafkaRequestHandler#6
> > >        Local Variable: scala.runtime.ObjectRef#737
> > >     at java.lang.Thread.run(Thread.java:745)
> > >
> >
>

Reply via email to