[ https://issues.apache.org/jira/browse/KAFKA-2308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15034439#comment-15034439 ]
Allen Wang commented on KAFKA-2308: ----------------------------------- [~gwenshap] [~guozhang] Is the fix in producer only? If I take 0.8.2.2 producer, do I also need to have broker/consumer upgraded to 0.8.2.2 or a later snappy version in order to avoid this bug? Currently our broker is on 0.8.2.1 and snappy 1.1.1.6. > New producer + Snappy face un-compression errors after broker restart > --------------------------------------------------------------------- > > Key: KAFKA-2308 > URL: https://issues.apache.org/jira/browse/KAFKA-2308 > Project: Kafka > Issue Type: Bug > Reporter: Gwen Shapira > Assignee: Gwen Shapira > Fix For: 0.9.0.0, 0.8.2.2 > > Attachments: KAFKA-2308.patch > > > Looks like the new producer, when used with Snappy, following a broker > restart is sending messages the brokers can't decompress. This issue was > discussed at few mailing lists thread, but I don't think we ever resolved it. > I can reproduce with trunk and Snappy 1.1.1.7. > To reproduce: > 1. Start 3 brokers > 2. Create a topic with 3 partitions and 3 replicas each. > 2. Start performance producer with --new-producer --compression-codec 2 (and > set the number of messages to fairly high, to give you time. I went with 10M) > 3. Bounce one of the brokers > 4. The log of one of the surviving nodes should contain errors like: > {code} > 2015-07-02 13:45:59,300 ERROR kafka.server.ReplicaManager: [Replica Manager > on Broker 66]: Error processing append operation on partition [t3,0] > kafka.common.KafkaException: > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:94) > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:64) > at > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) > at > kafka.message.ByteBufferMessageSet$$anon$2.innerDone(ByteBufferMessageSet.scala:177) > at > kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:218) > at > kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:173) > at > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) > at > scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) > at scala.collection.AbstractIterator.to(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) > at > kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:267) > at kafka.log.Log.liftedTree1$1(Log.scala:327) > at kafka.log.Log.append(Log.scala:326) > at > kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:423) > at > kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:409) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268) > at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:409) > at > kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:365) > at > kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:350) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:350) > at > kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:286) > at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:270) > at kafka.server.KafkaApis.handle(KafkaApis.scala:57) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.IOException: PARSING_ERROR(2) > at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84) > at org.xerial.snappy.SnappyNative.uncompressedLength(Native Method) > at org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:594) > at > org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:358) > at > org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:167) > at > org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:150) > at java.io.DataInputStream.readFully(DataInputStream.java:195) > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:82) > ... 43 more > {code} > The client has the following messages: > {code} > [2015-07-02 13:46:00,478] ERROR Error when sending message to topic t3 with > key: 4 bytes, value: 100 bytes with error: The server experienced an > unexpected error when processing the request > (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) > java: target/snappy-1.1.1/snappy.cc:423: char* > snappy::internal::CompressFragment(const char*, size_t, char*, > snappy::uint16*, int): Assertion `0 == memcmp(base, candidate, matched)' > failed. > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)