Hello,

I'm using Confluent Kafka (0.8.2.0-cp). When I'm trying to process message
from my Kafka topic with Spark Streaming, I've got the following error :

kafka.message.InvalidMessageException: Message is corrupt (stored crc =
3561357254, computed crc = 171652633)
        at kafka.message.Message.ensureValid(Message.scala:166)
        at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.
scala:102)
        at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.
scala:33)
        at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTempla
te.scala:66)
        at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
        at org.apache.spark.streaming.kafka.ReliableKafkaReceiver$Messa
geHandler.run(ReliableKafkaReceiver.scala:265)
        at java.util.concurrent.Executors$RunnableAdapter.call(
Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
Executor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
lExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
15/07/20 15:56:57 INFO BlockManager: Removing broadcast 4641
15/07/20 15:56:57 ERROR ReliableKafkaReceiver: Error handling message
java.lang.IllegalStateException: Iterator is in failed state
        at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)
        at org.apache.spark.streaming.kafka.ReliableKafkaReceiver$Messa
geHandler.run(ReliableKafkaReceiver.scala:265)
        at java.util.concurrent.Executors$RunnableAdapter.call(
Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
Executor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
lExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

>From my understanding, there's some corrupt message in my topic. I'm using
the new Producer API to send message compress with Snappy. I found an old
topic talking about it but with no further step to resolve the issue. Do
you have any informations regarding this ?

Is it possible in Kafka to somehow reread the topic and drop corrupt
message ?

Regards,
Nicolas PHUNG

Reply via email to