I am also seeing this issue when using the new producer and snappy compression, running mirrormaker (trunk, aug 10 or so). I'm using snappy 1.1.1.7
[2015-08-14 14:15:27,876] WARN Got error produce response with correlation id 5151552 on topic-partition mytopic-56, retrying (2147480801 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender) I also get the occasional java: target/snappy-1.1.1/snappy.cc:384: char* snappy::internal::CompressFragment(const char*, size_t, char*, snappy::uint16*, int): Assertion `hash == Hash(ip, shift)' failed. and associated "java.io.IOException: FAILED_TO_UNCOMPRESS(5) at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)" in kafka logs when using snappy compression with the new producer and restarting brokers. This looks to be very closely related to KAFKA-2308 (s/hash/memcp). Not sure how closely related to the message corruption loop this is though. I understand lz4 is now a compression option, which has both higher throughput and better compression in comparison to snappy, but I do not see it listed anywhere in the documentation. On Tue, May 12, 2015 at 9:45 PM, Roger Hoover <roger.hoo...@gmail.com> wrote: > Oops. I originally sent this to the dev list but meant to send it here. > > Hi, > > > > When using Samza 0.9.0 which uses the new Java producer client and snappy > > enabled, I see messages getting corrupted on the client side. It never > > happens with the old producer and it never happens with lz4, gzip, or no > > compression. It only happens when a broker gets restarted (or maybe just > > shutdown). > > > > The error is not always the same. I've noticed at least three types of > > errors on the Kafka brokers. > > > > 1) java.io.IOException: failed to read chunk > > at > > > org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:356) > > http://pastebin.com/NZrrEHxU > > 2) java.lang.OutOfMemoryError: Java heap space > > at > > > org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:346) > > http://pastebin.com/yuxk1BjY > > 3) java.io.IOException: PARSING_ERROR(2) > > at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84) > > http://pastebin.com/yq98Hx49 > > > > I've noticed a couple different behaviors from the Samza producer/job > > A) It goes into a long retry loop where this message is logged. I saw > > this with error #1 above. > > > > 2015-04-29 18:17:31 Sender [WARN] task[Partition 7] > > ssp[kafka,svc.call.w_deploy.c7tH4YaiTQyBEwAAhQzRXw,7] offset[9999253] Got > > error produce response with correlation id 4878 on topic-partition > > svc.call.w_deploy.T2UDe2PWRYWcVAAAhMOAwA-1, retrying (2147483646 attempts > > left). Error: CORRUPT_MESSAGE > > > > B) The job exits with > > org.apache.kafka.common.errors.UnknownServerException (at least when run > as > > ThreadJob). I saw this with error #3 above. > > > > org.apache.samza.SamzaException: Unable to send message from > > TaskName-Partition 6 to system kafka. > > org.apache.kafka.common.errors.UnknownServerException: The server > > experienced an unexpected error when processing the request > > > > There seem to be two issues here: > > > > 1) When leadership for a topic is transferred to another broker, the Java > > client (I think) has to move the data it was buffering for the original > > leader broker to the buffer for the new leader. My guess is that the > > corruption is happening at this point. > > > > 2) When a producer has corrupt message, it retries 2.1 billions times in > a > > hot loop even though it's not a retriable error. It probably shouldn't > > retry on such errors. For retriable errors, it would be much safer to > have > > a backoff scheme for retries. > > > > Thanks, > > > > Roger > > >