[ 
https://issues.apache.org/jira/browse/KAFKA-2308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14617096#comment-14617096
 ] 

Gwen Shapira commented on KAFKA-2308:
-------------------------------------

I kinda know why it happens, a patch will take a bit of time since I can't 
reproduce the error in a unit test (although it reproduces nicely in a test 
environment). I'll put a preliminary patch without unit tests up in a sec, so 
people suffering from this issue can validate. Here's what I found:

When we get a retriable error in the producer (NETWORK_EXCEPTION for instance), 
the current record batch gets put first in its topic-partition message batch 
queue by completeBatch().
Next time Sender runs, it drains the queue and one of the things it does is to 
take the first batch from the queue and close() it. But if a batch was 
re-queued, it was already closed. Calling close() twice should be safe, and for 
un-compressed messages, it is. However, for compressed messages the logic in 
close() is rather complex, and I believe closing a batch twice messes up the 
record. I can't tell exactly where the close() logic becomes unsafe, but 
there's really no need to close a batch twice. MemoryRecords.close() can check 
if it is writable before closing, and only close() the record if it is 
writable. This guarantees closing will happen just once. 

Fixing this resolved the problem on my system.



> New producer + Snappy face un-compression errors after broker restart
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-2308
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2308
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Gwen Shapira
>            Assignee: Gwen Shapira
>
> Looks like the new producer, when used with Snappy, following a broker 
> restart is sending messages the brokers can't decompress. This issue was 
> discussed at few mailing lists thread, but I don't think we ever resolved it.
> I can reproduce with trunk and Snappy 1.1.1.7. 
> To reproduce:
> 1. Start 3 brokers
> 2. Create a topic with 3 partitions and 3 replicas each.
> 2. Start performance producer with --new-producer --compression-codec 2 (and 
> set the number of messages to fairly high, to give you time. I went with 10M)
> 3. Bounce one of the brokers
> 4. The log of one of the surviving nodes should contain errors like:
> {code}
> 2015-07-02 13:45:59,300 ERROR kafka.server.ReplicaManager: [Replica Manager 
> on Broker 66]: Error processing append operation on partition [t3,0]
> kafka.common.KafkaException:
>         at 
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:94)
>         at 
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:64)
>         at 
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
>         at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
>         at 
> kafka.message.ByteBufferMessageSet$$anon$2.innerDone(ByteBufferMessageSet.scala:177)
>         at 
> kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:218)
>         at 
> kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:173)
>         at 
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
>         at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>         at 
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>         at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>         at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>         at 
> scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>         at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>         at 
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>         at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>         at 
> kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:267)
>         at kafka.log.Log.liftedTree1$1(Log.scala:327)
>         at kafka.log.Log.append(Log.scala:326)
>         at 
> kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:423)
>         at 
> kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:409)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>         at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268)
>         at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:409)
>         at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:365)
>         at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:350)
>         at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>         at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>         at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>         at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>         at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>         at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>         at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>         at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>         at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:350)
>         at 
> kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:286)
>         at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:270)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:57)
>         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: PARSING_ERROR(2)
>         at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
>         at org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
>         at org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:594)
>         at 
> org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:358)
>         at 
> org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:167)
>         at 
> org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:150)
>         at java.io.DataInputStream.readFully(DataInputStream.java:195)
>         at 
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:82)
>         ... 43 more
> {code}
> The client has the following messages:
> {code}
> [2015-07-02 13:46:00,478] ERROR Error when sending message to topic t3 with 
> key: 4 bytes, value: 100 bytes with error: The server experienced an 
> unexpected error when processing the request 
> (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
> java: target/snappy-1.1.1/snappy.cc:423: char* 
> snappy::internal::CompressFragment(const char*, size_t, char*, 
> snappy::uint16*, int): Assertion `0 == memcmp(base, candidate, matched)' 
> failed.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to