Hi Anand,

Can you describe your exact test setup ? This bug has been quite
elusive so far, it will be great to have a reproducible test case.
Also, are you using Kafka 0.7 or 0.8 ? I wonder if you can reproduce
this with Kafka 0.8 as well ?

Thanks
Neha

On Fri, Mar 29, 2013 at 8:15 AM, anand nalya <a.na...@computer.org> wrote:
> Hi Jun,
>
> I'm using async java producer. It works fine till the messages are in 100s
> of thousands but starts failing for anything above a million. Each message
> is around 2kb.
>
> I've tried both with single producer and multiple producers. Rate of this
> error is much less in single producer then in case of multiple producers.
>
> Thanks,
> Anand
>
>
> On 29 March 2013 20:24, Jun Rao <jun...@gmail.com> wrote:
>
>> This indicates that the messages sent to the broker are somehow corrupted.
>> Are you using a java producer? How many instances of producers do you have?
>>
>> Thanks,
>>
>> Jun
>>
>> On Fri, Mar 29, 2013 at 2:46 AM, anand nalya <anand.na...@gmail.com>
>> wrote:
>>
>> > Hi,
>> >
>> > I'm running kafka in distributed mode with 2 nodes. It works fine with
>> slow
>> > ingestion rates but when I increase the ingestion rate, both the nodes
>> > starts giving the following error:
>> >
>> > [2013-03-29 14:51:45,379] ERROR Closing socket for
>> /192.168.145.183because
>> > of error (kafka.network.Processor)
>> > kafka.message.InvalidMessageException: message is invalid, compression
>> > codec: NoCompressionCodec size: 414138 curr offset: 2901235 init offset:
>> 0
>> >     at
>> >
>> >
>> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
>> >     at
>> >
>> >
>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160)
>> >     at
>> >
>> >
>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
>> >     at
>> > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>> >     at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>> >     at
>> >
>> >
>> kafka.message.ByteBufferMessageSet.verifyMessageSize(ByteBufferMessageSet.scala:89)
>> >     at kafka.log.Log.append(Log.scala:218)
>> >     at
>> >
>> >
>> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
>> >     at
>> >
>> >
>> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
>> >     at
>> >
>> >
>> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
>> >     at
>> >
>> >
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>> >     at
>> >
>> >
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>> >     at
>> >
>> >
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
>> >     at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
>> >     at
>> > scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
>> >     at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
>> >     at
>> >
>> >
>> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
>> >     at
>> >
>> >
>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
>> >     at
>> >
>> >
>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
>> >     at kafka.network.Processor.handle(SocketServer.scala:296)
>> >     at kafka.network.Processor.read(SocketServer.scala:319)
>> >     at kafka.network.Processor.run(SocketServer.scala:214)
>> >     at java.lang.Thread.run(Thread.java:662)
>> >
>> > I'm running kafak 0.7.2 with jdk 1.6.0.43.
>> >
>> > Any idea what I might be doing wrong here?
>> >
>> > Regards,
>> > Anand
>> >
>>

Reply via email to