Hello, I'm using Confluent Kafka (0.8.2.0-cp). When I'm trying to process message from my Kafka topic with Spark Streaming, I've got the following error :
kafka.message.InvalidMessageException: Message is corrupt (stored crc = 3561357254, computed crc = 171652633) at kafka.message.Message.ensureValid(Message.scala:166) at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator. scala:102) at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator. scala:33) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTempla te.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at org.apache.spark.streaming.kafka.ReliableKafkaReceiver$Messa geHandler.run(ReliableKafkaReceiver.scala:265) at java.util.concurrent.Executors$RunnableAdapter.call( Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool Executor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo lExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 15/07/20 15:56:57 INFO BlockManager: Removing broadcast 4641 15/07/20 15:56:57 ERROR ReliableKafkaReceiver: Error handling message java.lang.IllegalStateException: Iterator is in failed state at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54) at org.apache.spark.streaming.kafka.ReliableKafkaReceiver$Messa geHandler.run(ReliableKafkaReceiver.scala:265) at java.util.concurrent.Executors$RunnableAdapter.call( Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool Executor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo lExecutor.java:617) at java.lang.Thread.run(Thread.java:745) >From my understanding, there's some corrupt message in my topic. I'm using the new Producer API to send message compress with Snappy. I found an old topic talking about it but with no further step to resolve the issue. Do you have any informations regarding this ? Is it possible in Kafka to somehow reread the topic and drop corrupt message ? Regards, Nicolas PHUNG