Hi,

I'm running kafka in distributed mode with 2 nodes. It works fine with slow
ingestion rates but when I increase the ingestion rate, both the nodes
starts giving the following error:

[2013-03-29 14:51:45,379] ERROR Closing socket for /192.168.145.183 because
of error (kafka.network.Processor)
kafka.message.InvalidMessageException: message is invalid, compression
codec: NoCompressionCodec size: 414138 curr offset: 2901235 init offset: 0
    at
kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
    at
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160)
    at
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
    at
kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
    at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
    at
kafka.message.ByteBufferMessageSet.verifyMessageSize(ByteBufferMessageSet.scala:89)
    at kafka.log.Log.append(Log.scala:218)
    at
kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
    at
kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
    at
kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
    at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
    at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
    at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
    at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
    at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
    at
kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
    at
kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
    at
kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
    at kafka.network.Processor.handle(SocketServer.scala:296)
    at kafka.network.Processor.read(SocketServer.scala:319)
    at kafka.network.Processor.run(SocketServer.scala:214)
    at java.lang.Thread.run(Thread.java:662)

I'm running kafak 0.7.2 with jdk 1.6.0.43.

Any idea what I might be doing wrong here?

Regards,
Anand

Reply via email to