Dmitri,

You said that the problem goes away once you upgrade to 0.7.1. Do you
mean that the message corruption doesn't happen or that when it
happens, the broker doesn't shut down ?
I'm asking since we occasionally do see the corruption on 0.7.1 but
don't have a good way to reproduce that. 0.7.1 fixes the shut down
issue so the broker ignores corrupted requests
instead of shutting down - https://issues.apache.org/jira/browse/KAFKA-261

Do you have a way to reproduce this issue ? It will really help in
troubleshooting the root cause.

Thanks,
Neha

On Thu, Nov 29, 2012 at 8:37 AM, Dmitri Priimak
<prii...@highwire.stanford.edu> wrote:
> On 12-11-28 09:54 PM, Jun Rao wrote:
>>
>> Dmitri,
>>
>> Could you reproduce this easily? Are you using a load balancer? Earlier,
>> another user had the same issue and eventually figured out that the
>> problem
>> is in the network router.
>
> I do not believe I have any loadbalancer anywhere in the picture and by the
> way problem does go
> away if I upgrade broker to version 0.7.1 or 0.7.2
>
> --
> Dmitri Priimak
>
>
>>
>> Thanks,
>>
>> Jun
>>
>> On Wed, Nov 28, 2012 at 11:34 AM, Dmitri Priimak <
>> prii...@highwire.stanford.edu> wrote:
>>
>>> Hi.
>>>
>>> In the kafka broker (version 0.7.0) log I see occasionally following
>>> error
>>> message
>>>
>>>   FATAL Halting due to unrecoverable I/O error while handling producer
>>> request: Unexpected end of
>>> ZLIB input stream (kafka.server.KafkaRequestHandlers)
>>> java.io.EOFException: Unexpected end of ZLIB input stream
>>>          at
>>> java.util.zip.InflaterInputStream.fill(InflaterInputStream.java:223)
>>>          at
>>> java.util.zip.InflaterInputStream.read(InflaterInputStream.java:141)
>>>          at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:90)
>>>          at java.io.FilterInputStream.read(FilterInputStream.java:90)
>>>          at
>>>
>>> kafka.message.CompressionUtils$$anonfun$decompress$4.apply$mcI$sp(CompressionUtils.scala:123)
>>>          at
>>>
>>> kafka.message.CompressionUtils$$anonfun$decompress$4.apply(CompressionUtils.scala:123)
>>>          at
>>>
>>> kafka.message.CompressionUtils$$anonfun$decompress$4.apply(CompressionUtils.scala:123)
>>>          at
>>>
>>> scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:598)
>>>          at
>>>
>>> scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:598)
>>>          at scala.collection.immutable.Stream$Cons.tail(Stream.scala:555)
>>>          at scala.collection.immutable.Stream$Cons.tail(Stream.scala:549)
>>>          at
>>>
>>> scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:394)
>>>          at
>>>
>>> scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:394)
>>>          at scala.collection.immutable.Stream$Cons.tail(Stream.scala:555)
>>>          at scala.collection.immutable.Stream$Cons.tail(Stream.scala:549)
>>>          at scala.collection.immutable.Stream.foreach(Stream.scala:255)
>>>          at
>>> kafka.message.CompressionUtils$.decompress(CompressionUtils.scala:123)
>>>          at
>>>
>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:124)
>>>          at
>>>
>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:138)
>>>          at
>>>
>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:82)
>>>          at
>>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>>>          at
>>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>>>          at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>>>          at
>>> kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
>>>          at
>>> scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
>>>          at kafka.message.MessageSet.foreach(MessageSet.scala:87)
>>>          at kafka.log.Log.append(Log.scala:202)
>>>          at
>>>
>>>
>>> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:75)
>>>          at
>>>
>>>
>>> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:68)
>>>          at
>>>
>>>
>>> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:68)
>>>          at
>>>
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>>>          at
>>>
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>>>          at
>>>
>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
>>>          at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
>>>          at
>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
>>>          at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
>>>          at
>>>
>>> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:68)
>>>          at
>>>
>>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:46)
>>>          at
>>>
>>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:46)
>>>          at kafka.network.Processor.handle(SocketServer.scala:289)
>>>          at kafka.network.Processor.read(SocketServer.scala:312)
>>>          at kafka.network.Processor.run(SocketServer.scala:207)
>>>          at java.lang.Thread.run(Thread.java:662)
>>>
>>> At which point broker actually dies. Shouldn't it keep working even even
>>> if there is some such error?
>>> Also, does anyone else saw this error? And is it fixed in the newer
>>> versions?
>>>
>>> --
>>> Dmitri Priimak
>>>
>>>
>

Reply via email to