Dmitri, You said that the problem goes away once you upgrade to 0.7.1. Do you mean that the message corruption doesn't happen or that when it happens, the broker doesn't shut down ? I'm asking since we occasionally do see the corruption on 0.7.1 but don't have a good way to reproduce that. 0.7.1 fixes the shut down issue so the broker ignores corrupted requests instead of shutting down - https://issues.apache.org/jira/browse/KAFKA-261
Do you have a way to reproduce this issue ? It will really help in troubleshooting the root cause. Thanks, Neha On Thu, Nov 29, 2012 at 8:37 AM, Dmitri Priimak <prii...@highwire.stanford.edu> wrote: > On 12-11-28 09:54 PM, Jun Rao wrote: >> >> Dmitri, >> >> Could you reproduce this easily? Are you using a load balancer? Earlier, >> another user had the same issue and eventually figured out that the >> problem >> is in the network router. > > I do not believe I have any loadbalancer anywhere in the picture and by the > way problem does go > away if I upgrade broker to version 0.7.1 or 0.7.2 > > -- > Dmitri Priimak > > >> >> Thanks, >> >> Jun >> >> On Wed, Nov 28, 2012 at 11:34 AM, Dmitri Priimak < >> prii...@highwire.stanford.edu> wrote: >> >>> Hi. >>> >>> In the kafka broker (version 0.7.0) log I see occasionally following >>> error >>> message >>> >>> FATAL Halting due to unrecoverable I/O error while handling producer >>> request: Unexpected end of >>> ZLIB input stream (kafka.server.KafkaRequestHandlers) >>> java.io.EOFException: Unexpected end of ZLIB input stream >>> at >>> java.util.zip.InflaterInputStream.fill(InflaterInputStream.java:223) >>> at >>> java.util.zip.InflaterInputStream.read(InflaterInputStream.java:141) >>> at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:90) >>> at java.io.FilterInputStream.read(FilterInputStream.java:90) >>> at >>> >>> kafka.message.CompressionUtils$$anonfun$decompress$4.apply$mcI$sp(CompressionUtils.scala:123) >>> at >>> >>> kafka.message.CompressionUtils$$anonfun$decompress$4.apply(CompressionUtils.scala:123) >>> at >>> >>> kafka.message.CompressionUtils$$anonfun$decompress$4.apply(CompressionUtils.scala:123) >>> at >>> >>> scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:598) >>> at >>> >>> scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:598) >>> at scala.collection.immutable.Stream$Cons.tail(Stream.scala:555) >>> at scala.collection.immutable.Stream$Cons.tail(Stream.scala:549) >>> at >>> >>> scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:394) >>> at >>> >>> scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:394) >>> at scala.collection.immutable.Stream$Cons.tail(Stream.scala:555) >>> at scala.collection.immutable.Stream$Cons.tail(Stream.scala:549) >>> at scala.collection.immutable.Stream.foreach(Stream.scala:255) >>> at >>> kafka.message.CompressionUtils$.decompress(CompressionUtils.scala:123) >>> at >>> >>> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:124) >>> at >>> >>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:138) >>> at >>> >>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:82) >>> at >>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) >>> at >>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) >>> at scala.collection.Iterator$class.foreach(Iterator.scala:631) >>> at >>> kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) >>> at >>> scala.collection.IterableLike$class.foreach(IterableLike.scala:79) >>> at kafka.message.MessageSet.foreach(MessageSet.scala:87) >>> at kafka.log.Log.append(Log.scala:202) >>> at >>> >>> >>> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:75) >>> at >>> >>> >>> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:68) >>> at >>> >>> >>> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:68) >>> at >>> >>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) >>> at >>> >>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) >>> at >>> >>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) >>> at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) >>> at >>> scala.collection.TraversableLike$class.map(TraversableLike.scala:206) >>> at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34) >>> at >>> >>> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:68) >>> at >>> >>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:46) >>> at >>> >>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:46) >>> at kafka.network.Processor.handle(SocketServer.scala:289) >>> at kafka.network.Processor.read(SocketServer.scala:312) >>> at kafka.network.Processor.run(SocketServer.scala:207) >>> at java.lang.Thread.run(Thread.java:662) >>> >>> At which point broker actually dies. Shouldn't it keep working even even >>> if there is some such error? >>> Also, does anyone else saw this error? And is it fixed in the newer >>> versions? >>> >>> -- >>> Dmitri Priimak >>> >>> >