1. The corrupt directory names is not something we can really handle. The names have to work with zookeeper so if weird characters get inserted I think the best we can do is give an error. I believe we have fixed the 0 byte file problem in 0.8. 2. The assumption for log recovery is that data that has been flushed to disk is stable, but data which has not been flushed can be in any state (garbage bytes, missing, partial writes, etc). Recovery runs only on the last segment of unflushed data (e.g. the file with the highest number) and only in the case of an unclean shutdown. This is basically an optimization--if we recover all the log on startup startup can take hours for large logs (e.g. 5TB at say 10MB/sec). From what you are saying it sounds like you had disk corruption prior to the last segment--in other words flushed data became corrupt. This is not handled in 0.7. In 0.8 you would have the option of just deleting the problematic data and restoring from replicas.
-Jay On Tue, Jul 16, 2013 at 1:10 PM, Blake Smith <blake.sm...@tempo-db.com>wrote: > Hi everyone, > > Last week, one of our production Kafka 0.7.1 servers had a hardware failure > that resulted in an unclean restart. When the server came back up 5 minutes > later, there were two topic corruption problems that we had to handle to > get the pipeline working again. > > 1. The kafka log directory had malformed partition directory names [1]. The > partition directories were shortened names of our topic naming scheme. > Inside some of these directories was a 0 byte .kafka file. This prevented > the kafka server process from starting. To resolve this, we manually > removed the malformed topic directories and the empty log files. > > Is this a known issue? If so, has it been addressed in 0.8? This seems like > a bug. > > 2. One of the partitions had a corrupt log message (crc32 check failure) > that prevented consumers from advancing on that partition [2]. To resolve > this issue we had to hexdump the log file seeking to the byte offset > provided by the consumer exception and remove the corrupt message at the > frame boundary. After removing the corrupt message, we created a task to > parse the log file and put it back into our producer / consumer pipeline. > The InvalidMessageException never bubbled to our consumer thread, > preventing us from handling this error without manual manipulation of the > log files (our consumer code was catching InvalidMessageException, but it > never reached that point). > > Is a corrupt log file a state that Kafka is supposed to handle, or is our > consumer code supposed to handle it? I'm confused about how we can avoid > manually cleaning up log files with a hex editor if we get into this state > again. > > Thanks! > > Blake > > [1] Corrupt partition directories in the broker data directories: > > 0 [main] INFO kafka.utils.Utils$ - The number of partitions for > topic datapoints-write : 50 > 1 [main] INFO kafka.utils.Utils$ - The number of partitions for > topic datapoints-increment : 50 > 2 [main] INFO kafka.utils.Utils$ - The number of partitions for > topic series-index : 5 > 5 [main] INFO kafka.server.KafkaServer - Starting Kafka server... > 17 [main] INFO kafka.log.LogManager - Loading log 'datapoints-write-18' > 32 [main] INFO kafka.log.Log - Loading the last segment > /var/kafka/datapoints-write-18/00000000095029068903.kafka in mutable > mode, recovery false > 46 [main] INFO kafka.log.LogManager - Loading log > 'datapoints-increment-15' > 46 [main] INFO kafka.log.Log - Loading the last segment > /var/kafka/datapoints-increment-15/00000000000000000000.kafka in > mutable mode, recovery false > 47 [main] INFO kafka.log.LogManager - Loading log > 'datapoints-increment-25' > 47 [main] INFO kafka.log.Log - Loading the last segment > /var/kafka/datapoints-increment-25/00000000000000000000.kafka in > mutable mode, recovery false > 47 [main] INFO kafka.log.LogManager - Loading log 'datapoints-write-27' > 48 [main] INFO kafka.log.Log - Loading the last segment > /var/kafka/datapoints-write-27/00000000093417873484.kafka in mutable > mode, recovery false > 48 [main] INFO kafka.log.LogManager - Loading log 'datapoints-write-9' > 48 [main] INFO kafka.log.Log - Loading the last segment > /var/kafka/datapoints-write-9/00000000092880448733.kafka in mutable > mode, recovery false > 49 [main] INFO kafka.log.LogManager - Loading log > 'datapoints-increment-8' > 49 [main] INFO kafka.log.Log - Loading the last segment > /var/kafka/datapoints-increment-8/00000000000000000000.kafka in > mutable mode, recovery false > 49 [main] INFO kafka.log.LogManager - Loading log > 'datapoints-increment-29' > 49 [main] INFO kafka.log.Log - Loading the last segment > /var/kafka/datapoints-increment-29/00000000008053064625.kafka in > mutable mode, recovery false > 50 [main] INFO kafka.log.LogManager - Loading log > 'datapoints-increment-34' > 50 [main] INFO kafka.log.Log - Loading the last segment > /var/kafka/datapoints-increment-34/00000000000000000000.kafka in > mutable mode, recovery false > 50 [main] INFO kafka.log.LogManager - Loading log > 'datapoints-increment-19' > 51 [main] INFO kafka.log.Log - Loading the last segment > /var/kafka/datapoints-increment-19/00000000000000000000.kafka in > mutable mode, recovery false > 51 [main] INFO kafka.log.LogManager - Loading log > 'datapoints-increment-16' > 51 [main] INFO kafka.log.Log - Loading the last segment > /var/kafka/datapoints-increment-16/00000000000000000000.kafka in > mutable mode, recovery false > 51 [main] INFO kafka.log.LogManager - Loading log > 'datapoints-increment-27' > 52 [main] INFO kafka.log.Log - Loading the last segment > /var/kafka/datapoints-increment-27/00000000000000000000.kafka in > mutable mode, recovery false > 52 [main] INFO kafka.log.LogManager - Loading log 'datapoints-write-34' > 52 [main] INFO kafka.log.Log - Loading the last segment > /var/kafka/datapoints-write-34/00000000092880904403.kafka in mutable > mode, recovery false > 53 [main] INFO kafka.log.LogManager - Loading log 'datapoints-write-21' > 53 [main] INFO kafka.log.Log - Loading the last segment > /var/kafka/datapoints-write-21/00000000095028969469.kafka in mutable > mode, recovery false > 53 [main] INFO kafka.log.LogManager - Loading log 'datapoints-write-48' > 54 [main] INFO kafka.log.Log - Loading the last segment > /var/kafka/datapoints-write-48/00000000094491048876.kafka in mutable > mode, recovery false > 54 [main] INFO kafka.log.LogManager - Loading log > 'datapoints-increment-40' > 55 [main] INFO kafka.log.Log - Loading the last segment > /var/kafka/datapoints-increment-40/00000000000000000000.kafka in > mutable mode, recovery false > 55 [main] INFO kafka.log.LogManager - Loading log > 'datapoints-increment-6' > 55 [main] INFO kafka.log.Log - Loading the last segment > /var/kafka/datapoints-increment-6/00000000000000000000.kafka in > mutable mode, recovery false > 56 [main] INFO kafka.log.LogManager - Loading log 'datapoints-write-25' > 56 [main] INFO kafka.log.Log - Loading the last segment > /var/kafka/datapoints-write-25/00000000093417394574.kafka in mutable > mode, recovery false > 56 [main] INFO kafka.log.LogManager - Loading log 'datapoints-write-33' > 57 [main] INFO kafka.log.Log - Loading the last segment > /var/kafka/datapoints-write-33/00000000093417473922.kafka in mutable > mode, recovery false > 57 [main] INFO kafka.log.LogManager - Loading log 'datapoints-write-35' > 58 [main] INFO kafka.log.Log - Loading the last segment > /var/kafka/datapoints-write-35/00000000092880891040.kafka in mutable > mode, recovery false > 58 [main] INFO kafka.log.LogManager - Loading log 'datapoints-wr' > 58 [main] INFO kafka.log.Log - Loading the last segment > /var/kafka/datapoints-wr/00000000000000000000.kafka in mutable mode, > recovery false > 60 [main] FATAL kafka.server.KafkaServerStartable - Fatal error > during KafkaServerStable startup. Prepare to shutdown > java.lang.NumberFormatException: For input string: "wr" > at > java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) > at java.lang.Integer.parseInt(Integer.java:481) > at java.lang.Integer.parseInt(Integer.java:514) > at > scala.collection.immutable.StringLike$class.toInt(StringLike.scala:207) > at scala.collection.immutable.StringOps.toInt(StringOps.scala:31) > at kafka.utils.Utils$.getTopicPartition(Utils.scala:558) > at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:71) > at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:65) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) > at kafka.log.LogManager.<init>(LogManager.scala:65) > at kafka.server.KafkaServer.startup(KafkaServer.scala:58) > at > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34) > at kafka.Kafka$.main(Kafka.scala:50) > at kafka.Kafka.main(Kafka.scala) > 61 [main] INFO kafka.server.KafkaServer - Shutting down Kafka server > 62 [main] INFO kafka.utils.KafkaScheduler - shutdown scheduler > kafka-logcleaner- > 62 [main] INFO kafka.server.KafkaServer - Kafka server shut down > completed > > [blake@kafka03 kafka]$ ls > dat datapoints-increment-40 datapoints-write-26 > datapoints-increment-0 datapoints-increment-41 datapoints-write-27 > datapoints-increment-1 datapoints-increment-42 datapoints-write-28 > datapoints-increment-10 datapoints-increment-43 datapoints-write-29 > datapoints-increment-11 datapoints-increment-44 datapoints-write-3 > datapoints-increment-12 datapoints-increment-45 datapoints-write-30 > datapoints-increment-13 datapoints-increment-46 datapoints-write-31 > datapoints-increment-14 datapoints-increment-47 datapoints-write-32 > datapoints-increment-15 datapoints-increment-48 datapoints-write-33 > datapoints-increment-16 datapoints-increment-49 datapoints-write-34 > datapoints-increment-17 datapoints-increment-5 datapoints-write-35 > datapoints-increment-18 datapoints-increment-6 datapoints-write-36 > datapoints-increment-19 datapoints-increment-7 datapoints-write-37 > datapoints-increment-2 datapoints-increment-8 datapoints-write-38 > datapoints-increment-20 datapoints-increment-9 datapoints-write-39 > datapoints-increment-21 datapoints-wr datapoints-write-4 > datapoints-increment-22 datapoints-writ datapoints-write-40 > datapoints-increment-23 datapoints-write-0 datapoints-write-41 > datapoints-increment-24 datapoints-write-1 datapoints-write-42 > datapoints-increment-25 datapoints-write-10 datapoints-write-43 > datapoints-increment-26 datapoints-write-11 datapoints-write-44 > datapoints-increment-27 datapoints-write-12 datapoints-write-45 > datapoints-increment-28 datapoints-write-13 datapoints-write-46 > datapoints-increment-29 datapoints-write-14 datapoints-write-47 > datapoints-increment-3 datapoints-write-15 datapoints-write-48 > datapoints-increment-30 datapoints-write-16 datapoints-write-49 > datapoints-increment-31 datapoints-write-17 datapoints-write-5 > datapoints-increment-32 datapoints-write-18 datapoints-write-6 > datapoints-increment-33 datapoints-write-19 datapoints-write-7 > datapoints-increment-34 datapoints-write-2 datapoints-write-8 > datapoints-increment-35 datapoints-write-20 datapoints-write-9 > datapoints-increment-36 datapoints-write-21 series-index-0 > datapoints-increment-37 datapoints-write-22 series-index-1 > datapoints-increment-38 datapoints-write-23 series-index-2 > datapoints-increment-39 datapoints-write-24 series-index-3 > datapoints-increment-4 datapoints-write-25 series-index-4 > [blake@kafka03 kafka]$ cd datapoints-wr > [blake@kafka03 datapoints-wr]$ ls -lah > total 8.0K > drwxr-xr-x 2 kafka kafka 4.0K Jul 11 18:51 . > drwxr-xr-x 110 kafka kafka 4.0K Jul 11 19:05 .. > -rw-r--r-- 1 kafka kafka 0 Jul 11 18:51 00000000000000000000.kafka > > > > [2] In our consumer logs: > > 013-07-11 21:33:01,136 61867 [FetchRunnable-0] ERROR > kafka.consumer.FetcherRunnable - error in FetcherRunnable > kafka.message.InvalidMessageException: message is invalid, compression > codec: NoCompressionCodec size: 79 curr offset: 93706191248 init > offset: 93706191165 > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160) > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) > at > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > at > kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64) > at > kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59) > at > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57) > at > kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79) > at > kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:65) > at > scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59) > at scala.collection.immutable.List.foreach(List.scala:76) > at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65) >