Hi everyone, Last week, one of our production Kafka 0.7.1 servers had a hardware failure that resulted in an unclean restart. When the server came back up 5 minutes later, there were two topic corruption problems that we had to handle to get the pipeline working again.
1. The kafka log directory had malformed partition directory names [1]. The partition directories were shortened names of our topic naming scheme. Inside some of these directories was a 0 byte .kafka file. This prevented the kafka server process from starting. To resolve this, we manually removed the malformed topic directories and the empty log files. Is this a known issue? If so, has it been addressed in 0.8? This seems like a bug. 2. One of the partitions had a corrupt log message (crc32 check failure) that prevented consumers from advancing on that partition [2]. To resolve this issue we had to hexdump the log file seeking to the byte offset provided by the consumer exception and remove the corrupt message at the frame boundary. After removing the corrupt message, we created a task to parse the log file and put it back into our producer / consumer pipeline. The InvalidMessageException never bubbled to our consumer thread, preventing us from handling this error without manual manipulation of the log files (our consumer code was catching InvalidMessageException, but it never reached that point). Is a corrupt log file a state that Kafka is supposed to handle, or is our consumer code supposed to handle it? I'm confused about how we can avoid manually cleaning up log files with a hex editor if we get into this state again. Thanks! Blake [1] Corrupt partition directories in the broker data directories: 0 [main] INFO kafka.utils.Utils$ - The number of partitions for topic datapoints-write : 50 1 [main] INFO kafka.utils.Utils$ - The number of partitions for topic datapoints-increment : 50 2 [main] INFO kafka.utils.Utils$ - The number of partitions for topic series-index : 5 5 [main] INFO kafka.server.KafkaServer - Starting Kafka server... 17 [main] INFO kafka.log.LogManager - Loading log 'datapoints-write-18' 32 [main] INFO kafka.log.Log - Loading the last segment /var/kafka/datapoints-write-18/00000000095029068903.kafka in mutable mode, recovery false 46 [main] INFO kafka.log.LogManager - Loading log 'datapoints-increment-15' 46 [main] INFO kafka.log.Log - Loading the last segment /var/kafka/datapoints-increment-15/00000000000000000000.kafka in mutable mode, recovery false 47 [main] INFO kafka.log.LogManager - Loading log 'datapoints-increment-25' 47 [main] INFO kafka.log.Log - Loading the last segment /var/kafka/datapoints-increment-25/00000000000000000000.kafka in mutable mode, recovery false 47 [main] INFO kafka.log.LogManager - Loading log 'datapoints-write-27' 48 [main] INFO kafka.log.Log - Loading the last segment /var/kafka/datapoints-write-27/00000000093417873484.kafka in mutable mode, recovery false 48 [main] INFO kafka.log.LogManager - Loading log 'datapoints-write-9' 48 [main] INFO kafka.log.Log - Loading the last segment /var/kafka/datapoints-write-9/00000000092880448733.kafka in mutable mode, recovery false 49 [main] INFO kafka.log.LogManager - Loading log 'datapoints-increment-8' 49 [main] INFO kafka.log.Log - Loading the last segment /var/kafka/datapoints-increment-8/00000000000000000000.kafka in mutable mode, recovery false 49 [main] INFO kafka.log.LogManager - Loading log 'datapoints-increment-29' 49 [main] INFO kafka.log.Log - Loading the last segment /var/kafka/datapoints-increment-29/00000000008053064625.kafka in mutable mode, recovery false 50 [main] INFO kafka.log.LogManager - Loading log 'datapoints-increment-34' 50 [main] INFO kafka.log.Log - Loading the last segment /var/kafka/datapoints-increment-34/00000000000000000000.kafka in mutable mode, recovery false 50 [main] INFO kafka.log.LogManager - Loading log 'datapoints-increment-19' 51 [main] INFO kafka.log.Log - Loading the last segment /var/kafka/datapoints-increment-19/00000000000000000000.kafka in mutable mode, recovery false 51 [main] INFO kafka.log.LogManager - Loading log 'datapoints-increment-16' 51 [main] INFO kafka.log.Log - Loading the last segment /var/kafka/datapoints-increment-16/00000000000000000000.kafka in mutable mode, recovery false 51 [main] INFO kafka.log.LogManager - Loading log 'datapoints-increment-27' 52 [main] INFO kafka.log.Log - Loading the last segment /var/kafka/datapoints-increment-27/00000000000000000000.kafka in mutable mode, recovery false 52 [main] INFO kafka.log.LogManager - Loading log 'datapoints-write-34' 52 [main] INFO kafka.log.Log - Loading the last segment /var/kafka/datapoints-write-34/00000000092880904403.kafka in mutable mode, recovery false 53 [main] INFO kafka.log.LogManager - Loading log 'datapoints-write-21' 53 [main] INFO kafka.log.Log - Loading the last segment /var/kafka/datapoints-write-21/00000000095028969469.kafka in mutable mode, recovery false 53 [main] INFO kafka.log.LogManager - Loading log 'datapoints-write-48' 54 [main] INFO kafka.log.Log - Loading the last segment /var/kafka/datapoints-write-48/00000000094491048876.kafka in mutable mode, recovery false 54 [main] INFO kafka.log.LogManager - Loading log 'datapoints-increment-40' 55 [main] INFO kafka.log.Log - Loading the last segment /var/kafka/datapoints-increment-40/00000000000000000000.kafka in mutable mode, recovery false 55 [main] INFO kafka.log.LogManager - Loading log 'datapoints-increment-6' 55 [main] INFO kafka.log.Log - Loading the last segment /var/kafka/datapoints-increment-6/00000000000000000000.kafka in mutable mode, recovery false 56 [main] INFO kafka.log.LogManager - Loading log 'datapoints-write-25' 56 [main] INFO kafka.log.Log - Loading the last segment /var/kafka/datapoints-write-25/00000000093417394574.kafka in mutable mode, recovery false 56 [main] INFO kafka.log.LogManager - Loading log 'datapoints-write-33' 57 [main] INFO kafka.log.Log - Loading the last segment /var/kafka/datapoints-write-33/00000000093417473922.kafka in mutable mode, recovery false 57 [main] INFO kafka.log.LogManager - Loading log 'datapoints-write-35' 58 [main] INFO kafka.log.Log - Loading the last segment /var/kafka/datapoints-write-35/00000000092880891040.kafka in mutable mode, recovery false 58 [main] INFO kafka.log.LogManager - Loading log 'datapoints-wr' 58 [main] INFO kafka.log.Log - Loading the last segment /var/kafka/datapoints-wr/00000000000000000000.kafka in mutable mode, recovery false 60 [main] FATAL kafka.server.KafkaServerStartable - Fatal error during KafkaServerStable startup. Prepare to shutdown java.lang.NumberFormatException: For input string: "wr" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Integer.parseInt(Integer.java:481) at java.lang.Integer.parseInt(Integer.java:514) at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:207) at scala.collection.immutable.StringOps.toInt(StringOps.scala:31) at kafka.utils.Utils$.getTopicPartition(Utils.scala:558) at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:71) at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:65) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) at kafka.log.LogManager.<init>(LogManager.scala:65) at kafka.server.KafkaServer.startup(KafkaServer.scala:58) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34) at kafka.Kafka$.main(Kafka.scala:50) at kafka.Kafka.main(Kafka.scala) 61 [main] INFO kafka.server.KafkaServer - Shutting down Kafka server 62 [main] INFO kafka.utils.KafkaScheduler - shutdown scheduler kafka-logcleaner- 62 [main] INFO kafka.server.KafkaServer - Kafka server shut down completed [blake@kafka03 kafka]$ ls dat datapoints-increment-40 datapoints-write-26 datapoints-increment-0 datapoints-increment-41 datapoints-write-27 datapoints-increment-1 datapoints-increment-42 datapoints-write-28 datapoints-increment-10 datapoints-increment-43 datapoints-write-29 datapoints-increment-11 datapoints-increment-44 datapoints-write-3 datapoints-increment-12 datapoints-increment-45 datapoints-write-30 datapoints-increment-13 datapoints-increment-46 datapoints-write-31 datapoints-increment-14 datapoints-increment-47 datapoints-write-32 datapoints-increment-15 datapoints-increment-48 datapoints-write-33 datapoints-increment-16 datapoints-increment-49 datapoints-write-34 datapoints-increment-17 datapoints-increment-5 datapoints-write-35 datapoints-increment-18 datapoints-increment-6 datapoints-write-36 datapoints-increment-19 datapoints-increment-7 datapoints-write-37 datapoints-increment-2 datapoints-increment-8 datapoints-write-38 datapoints-increment-20 datapoints-increment-9 datapoints-write-39 datapoints-increment-21 datapoints-wr datapoints-write-4 datapoints-increment-22 datapoints-writ datapoints-write-40 datapoints-increment-23 datapoints-write-0 datapoints-write-41 datapoints-increment-24 datapoints-write-1 datapoints-write-42 datapoints-increment-25 datapoints-write-10 datapoints-write-43 datapoints-increment-26 datapoints-write-11 datapoints-write-44 datapoints-increment-27 datapoints-write-12 datapoints-write-45 datapoints-increment-28 datapoints-write-13 datapoints-write-46 datapoints-increment-29 datapoints-write-14 datapoints-write-47 datapoints-increment-3 datapoints-write-15 datapoints-write-48 datapoints-increment-30 datapoints-write-16 datapoints-write-49 datapoints-increment-31 datapoints-write-17 datapoints-write-5 datapoints-increment-32 datapoints-write-18 datapoints-write-6 datapoints-increment-33 datapoints-write-19 datapoints-write-7 datapoints-increment-34 datapoints-write-2 datapoints-write-8 datapoints-increment-35 datapoints-write-20 datapoints-write-9 datapoints-increment-36 datapoints-write-21 series-index-0 datapoints-increment-37 datapoints-write-22 series-index-1 datapoints-increment-38 datapoints-write-23 series-index-2 datapoints-increment-39 datapoints-write-24 series-index-3 datapoints-increment-4 datapoints-write-25 series-index-4 [blake@kafka03 kafka]$ cd datapoints-wr [blake@kafka03 datapoints-wr]$ ls -lah total 8.0K drwxr-xr-x 2 kafka kafka 4.0K Jul 11 18:51 . drwxr-xr-x 110 kafka kafka 4.0K Jul 11 19:05 .. -rw-r--r-- 1 kafka kafka 0 Jul 11 18:51 00000000000000000000.kafka [2] In our consumer logs: 013-07-11 21:33:01,136 61867 [FetchRunnable-0] ERROR kafka.consumer.FetcherRunnable - error in FetcherRunnable kafka.message.InvalidMessageException: message is invalid, compression codec: NoCompressionCodec size: 79 curr offset: 93706191248 init offset: 93706191165 at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) at kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64) at kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59) at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57) at kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79) at kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:65) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59) at scala.collection.immutable.List.foreach(List.scala:76) at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65)