1. Cool. For now I'll write this off as a fluke side-effect of a unsafe hardware shutdown. If I come up with a reproducible test case, I'll send it (or a patch) your way.
2. After the process came up after the crash, recovery tried to run on each partition, but because of the malformed directory issue it looks like recovery never ran on partition 11 (the log with the corrupt message). 40472 [main] INFO kafka.log.Log - Loading the last segment /var/kafka/datapoints-write-33/00000000093417473922.kafka in mutable mode, recovery true 44032 [main] INFO kafka.message.FileMessageSet - recover high water mark:199897864 44032 [main] INFO kafka.message.FileMessageSet - Recovery succeeded in 3 seconds. 0 bytes truncated. 44033 [main] INFO kafka.log.LogManager - Loading log 'datapoints-write-35' 44044 [main] INFO kafka.log.Log - Loading the last segment /var/kafka/datapoints-write-35/00000000092880891040.kafka in mutable mode, recovery true 48836 [main] INFO kafka.message.FileMessageSet - recover high water mark:268123698 48836 [main] INFO kafka.message.FileMessageSet - Recovery succeeded in 4 seconds. 0 bytes truncated. 48859 [main] INFO kafka.log.LogManager - Loading log 'datapoints-wr' 48911 [main] FATAL kafka.server.KafkaServerStartable - Fatal error during KafkaServerStable startup. Prepare to shutdown java.lang.NumberFormatException: For input string: "wr" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Integer.parseInt(Integer.java:481) at java.lang.Integer.parseInt(Integer.java:514) at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:207) at scala.collection.immutable.StringOps.toInt(StringOps.scala:31) at kafka.utils.Utils$.getTopicPartition(Utils.scala:558) at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:71) at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:65) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) at kafka.log.LogManager.<init>(LogManager.scala:65) at kafka.server.KafkaServer.startup(KafkaServer.scala:58) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34) at kafka.Kafka$.main(Kafka.scala:50) at kafka.Kafka.main(Kafka.scala) 48912 [main] INFO kafka.server.KafkaServer - Shutting down Kafka server 48913 [main] INFO kafka.utils.KafkaScheduler - shutdown scheduler kafka-logcleaner- 48913 [main] INFO kafka.server.KafkaServer - Kafka server shut down completed 0 [main] INFO kafka.utils.Utils$ - The number of partitions for topic datapoints-write : 50 2 [main] INFO kafka.utils.Utils$ - The number of partitions for topic datapoints-increment : 50 2 [main] INFO kafka.utils.Utils$ - The number of partitions for topic series-index : 5 5 [main] INFO kafka.server.KafkaServer - Starting Kafka server... 17 [main] INFO kafka.log.LogManager - Loading log 'datapoints-write-18' 32 [main] INFO kafka.log.Log - Loading the last segment /var/kafka/datapoints-write-18/00000000095029068903.kafka in mutable mode, recovery false 46 [main] INFO kafka.log.LogManager - Loading log 'datapoints-increment-15' 46 [main] INFO kafka.log.Log - Loading the last segment /var/kafka/datapoints-increment-15/00000000000000000000.kafka in mutable mode, recovery false 47 [main] INFO kafka.log.LogManager - Loading log 'datapoints-increment-25' 47 [main] INFO kafka.log.Log - Loading the last segment /var/kafka/datapoints-increment-25/00000000000000000000.kafka in mutable mode, recovery false 47 [main] INFO kafka.log.LogManager - Loading log 'datapoints-write-27' 48 [main] INFO kafka.log.Log - Loading the last segment /var/kafka/datapoints-write-27/00000000093417873484.kafka in mutable mode, recovery false How is recovery state tracked? From the behavior I'm seeing, it looks like it is only triggered on the first restart after an unclean shutdown. Blake On Tue, Jul 16, 2013 at 6:52 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > 1. I don't think this is something kafka can do. We never rename > directories once we create them. I'm a little at a loss as to how/why the > os would do it either, though. > 2. Can you check if it ran recovery or not (you would see a bunch of > messages about recovering each topic). This process goes through each > message sequentially and checks the CRC. If that did happen it is unlikely > that there was a problem in the log itself. > > -Jay > > > On Tue, Jul 16, 2013 at 3:04 PM, Blake Smith <blake.sm...@tempo-db.com > >wrote: > > > Thanks for the response Jay, > > > > 1. Do you think the corrupt partition directory names is happening at the > > kafka level, or the file system level? If it's at the file system level, > > perhaps we can investigate either changing FS options or selecting a > > different filesystem for the log files. Is there a recommended > filesystem / > > filesystem options? > > > > 2. I'm a little confused about terminology, but from what I observed the > > disk corruption was in the last segment for the partition. At the time I > > investigated last segment file for the bad partition was ~300mb large and > > the corrupt message was around around 288mb byte offset (Our segment > files > > seem to get rotated at 512mb, which I believe is the default). When the > > server came back up, the corrupt last segment file was still being > written > > to beyond the corrupt message. Is this the behavior you're describing? > > > > Blake > > > > > > On Tue, Jul 16, 2013 at 3:29 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > > > > > 1. The corrupt directory names is not something we can really handle. > The > > > names have to work with zookeeper so if weird characters get inserted I > > > think the best we can do is give an error. I believe we have fixed the > 0 > > > byte file problem in 0.8. > > > 2. The assumption for log recovery is that data that has been flushed > to > > > disk is stable, but data which has not been flushed can be in any state > > > (garbage bytes, missing, partial writes, etc). Recovery runs only on > the > > > last segment of unflushed data (e.g. the file with the highest number) > > and > > > only in the case of an unclean shutdown. This is basically an > > > optimization--if we recover all the log on startup startup can take > hours > > > for large logs (e.g. 5TB at say 10MB/sec). From what you are saying it > > > sounds like you had disk corruption prior to the last segment--in other > > > words flushed data became corrupt. This is not handled in 0.7. In 0.8 > you > > > would have the option of just deleting the problematic data and > restoring > > > from replicas. > > > > > > -Jay > > > > > > > > > On Tue, Jul 16, 2013 at 1:10 PM, Blake Smith <blake.sm...@tempo-db.com > > > >wrote: > > > > > > > Hi everyone, > > > > > > > > Last week, one of our production Kafka 0.7.1 servers had a hardware > > > failure > > > > that resulted in an unclean restart. When the server came back up 5 > > > minutes > > > > later, there were two topic corruption problems that we had to handle > > to > > > > get the pipeline working again. > > > > > > > > 1. The kafka log directory had malformed partition directory names > [1]. > > > The > > > > partition directories were shortened names of our topic naming > scheme. > > > > Inside some of these directories was a 0 byte .kafka file. This > > prevented > > > > the kafka server process from starting. To resolve this, we manually > > > > removed the malformed topic directories and the empty log files. > > > > > > > > Is this a known issue? If so, has it been addressed in 0.8? This > seems > > > like > > > > a bug. > > > > > > > > 2. One of the partitions had a corrupt log message (crc32 check > > failure) > > > > that prevented consumers from advancing on that partition [2]. To > > resolve > > > > this issue we had to hexdump the log file seeking to the byte offset > > > > provided by the consumer exception and remove the corrupt message at > > the > > > > frame boundary. After removing the corrupt message, we created a task > > to > > > > parse the log file and put it back into our producer / consumer > > pipeline. > > > > The InvalidMessageException never bubbled to our consumer thread, > > > > preventing us from handling this error without manual manipulation of > > the > > > > log files (our consumer code was catching InvalidMessageException, > but > > it > > > > never reached that point). > > > > > > > > Is a corrupt log file a state that Kafka is supposed to handle, or is > > our > > > > consumer code supposed to handle it? I'm confused about how we can > > avoid > > > > manually cleaning up log files with a hex editor if we get into this > > > state > > > > again. > > > > > > > > Thanks! > > > > > > > > Blake > > > > > > > > [1] Corrupt partition directories in the broker data directories: > > > > > > > > 0 [main] INFO kafka.utils.Utils$ - The number of partitions for > > > > topic datapoints-write : 50 > > > > 1 [main] INFO kafka.utils.Utils$ - The number of partitions for > > > > topic datapoints-increment : 50 > > > > 2 [main] INFO kafka.utils.Utils$ - The number of partitions for > > > > topic series-index : 5 > > > > 5 [main] INFO kafka.server.KafkaServer - Starting Kafka > server... > > > > 17 [main] INFO kafka.log.LogManager - Loading log > > > 'datapoints-write-18' > > > > 32 [main] INFO kafka.log.Log - Loading the last segment > > > > /var/kafka/datapoints-write-18/00000000095029068903.kafka in mutable > > > > mode, recovery false > > > > 46 [main] INFO kafka.log.LogManager - Loading log > > > > 'datapoints-increment-15' > > > > 46 [main] INFO kafka.log.Log - Loading the last segment > > > > /var/kafka/datapoints-increment-15/00000000000000000000.kafka in > > > > mutable mode, recovery false > > > > 47 [main] INFO kafka.log.LogManager - Loading log > > > > 'datapoints-increment-25' > > > > 47 [main] INFO kafka.log.Log - Loading the last segment > > > > /var/kafka/datapoints-increment-25/00000000000000000000.kafka in > > > > mutable mode, recovery false > > > > 47 [main] INFO kafka.log.LogManager - Loading log > > > 'datapoints-write-27' > > > > 48 [main] INFO kafka.log.Log - Loading the last segment > > > > /var/kafka/datapoints-write-27/00000000093417873484.kafka in mutable > > > > mode, recovery false > > > > 48 [main] INFO kafka.log.LogManager - Loading log > > > 'datapoints-write-9' > > > > 48 [main] INFO kafka.log.Log - Loading the last segment > > > > /var/kafka/datapoints-write-9/00000000092880448733.kafka in mutable > > > > mode, recovery false > > > > 49 [main] INFO kafka.log.LogManager - Loading log > > > > 'datapoints-increment-8' > > > > 49 [main] INFO kafka.log.Log - Loading the last segment > > > > /var/kafka/datapoints-increment-8/00000000000000000000.kafka in > > > > mutable mode, recovery false > > > > 49 [main] INFO kafka.log.LogManager - Loading log > > > > 'datapoints-increment-29' > > > > 49 [main] INFO kafka.log.Log - Loading the last segment > > > > /var/kafka/datapoints-increment-29/00000000008053064625.kafka in > > > > mutable mode, recovery false > > > > 50 [main] INFO kafka.log.LogManager - Loading log > > > > 'datapoints-increment-34' > > > > 50 [main] INFO kafka.log.Log - Loading the last segment > > > > /var/kafka/datapoints-increment-34/00000000000000000000.kafka in > > > > mutable mode, recovery false > > > > 50 [main] INFO kafka.log.LogManager - Loading log > > > > 'datapoints-increment-19' > > > > 51 [main] INFO kafka.log.Log - Loading the last segment > > > > /var/kafka/datapoints-increment-19/00000000000000000000.kafka in > > > > mutable mode, recovery false > > > > 51 [main] INFO kafka.log.LogManager - Loading log > > > > 'datapoints-increment-16' > > > > 51 [main] INFO kafka.log.Log - Loading the last segment > > > > /var/kafka/datapoints-increment-16/00000000000000000000.kafka in > > > > mutable mode, recovery false > > > > 51 [main] INFO kafka.log.LogManager - Loading log > > > > 'datapoints-increment-27' > > > > 52 [main] INFO kafka.log.Log - Loading the last segment > > > > /var/kafka/datapoints-increment-27/00000000000000000000.kafka in > > > > mutable mode, recovery false > > > > 52 [main] INFO kafka.log.LogManager - Loading log > > > 'datapoints-write-34' > > > > 52 [main] INFO kafka.log.Log - Loading the last segment > > > > /var/kafka/datapoints-write-34/00000000092880904403.kafka in mutable > > > > mode, recovery false > > > > 53 [main] INFO kafka.log.LogManager - Loading log > > > 'datapoints-write-21' > > > > 53 [main] INFO kafka.log.Log - Loading the last segment > > > > /var/kafka/datapoints-write-21/00000000095028969469.kafka in mutable > > > > mode, recovery false > > > > 53 [main] INFO kafka.log.LogManager - Loading log > > > 'datapoints-write-48' > > > > 54 [main] INFO kafka.log.Log - Loading the last segment > > > > /var/kafka/datapoints-write-48/00000000094491048876.kafka in mutable > > > > mode, recovery false > > > > 54 [main] INFO kafka.log.LogManager - Loading log > > > > 'datapoints-increment-40' > > > > 55 [main] INFO kafka.log.Log - Loading the last segment > > > > /var/kafka/datapoints-increment-40/00000000000000000000.kafka in > > > > mutable mode, recovery false > > > > 55 [main] INFO kafka.log.LogManager - Loading log > > > > 'datapoints-increment-6' > > > > 55 [main] INFO kafka.log.Log - Loading the last segment > > > > /var/kafka/datapoints-increment-6/00000000000000000000.kafka in > > > > mutable mode, recovery false > > > > 56 [main] INFO kafka.log.LogManager - Loading log > > > 'datapoints-write-25' > > > > 56 [main] INFO kafka.log.Log - Loading the last segment > > > > /var/kafka/datapoints-write-25/00000000093417394574.kafka in mutable > > > > mode, recovery false > > > > 56 [main] INFO kafka.log.LogManager - Loading log > > > 'datapoints-write-33' > > > > 57 [main] INFO kafka.log.Log - Loading the last segment > > > > /var/kafka/datapoints-write-33/00000000093417473922.kafka in mutable > > > > mode, recovery false > > > > 57 [main] INFO kafka.log.LogManager - Loading log > > > 'datapoints-write-35' > > > > 58 [main] INFO kafka.log.Log - Loading the last segment > > > > /var/kafka/datapoints-write-35/00000000092880891040.kafka in mutable > > > > mode, recovery false > > > > 58 [main] INFO kafka.log.LogManager - Loading log 'datapoints-wr' > > > > 58 [main] INFO kafka.log.Log - Loading the last segment > > > > /var/kafka/datapoints-wr/00000000000000000000.kafka in mutable mode, > > > > recovery false > > > > 60 [main] FATAL kafka.server.KafkaServerStartable - Fatal error > > > > during KafkaServerStable startup. Prepare to shutdown > > > > java.lang.NumberFormatException: For input string: "wr" > > > > at > > > > > > > > > > java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) > > > > at java.lang.Integer.parseInt(Integer.java:481) > > > > at java.lang.Integer.parseInt(Integer.java:514) > > > > at > > > > > scala.collection.immutable.StringLike$class.toInt(StringLike.scala:207) > > > > at > > scala.collection.immutable.StringOps.toInt(StringOps.scala:31) > > > > at kafka.utils.Utils$.getTopicPartition(Utils.scala:558) > > > > at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:71) > > > > at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:65) > > > > at > > > > > > > > > > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) > > > > at > scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) > > > > at kafka.log.LogManager.<init>(LogManager.scala:65) > > > > at kafka.server.KafkaServer.startup(KafkaServer.scala:58) > > > > at > > > > > > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34) > > > > at kafka.Kafka$.main(Kafka.scala:50) > > > > at kafka.Kafka.main(Kafka.scala) > > > > 61 [main] INFO kafka.server.KafkaServer - Shutting down Kafka > > server > > > > 62 [main] INFO kafka.utils.KafkaScheduler - shutdown scheduler > > > > kafka-logcleaner- > > > > 62 [main] INFO kafka.server.KafkaServer - Kafka server shut down > > > > completed > > > > > > > > [blake@kafka03 kafka]$ ls > > > > dat datapoints-increment-40 datapoints-write-26 > > > > datapoints-increment-0 datapoints-increment-41 datapoints-write-27 > > > > datapoints-increment-1 datapoints-increment-42 datapoints-write-28 > > > > datapoints-increment-10 datapoints-increment-43 datapoints-write-29 > > > > datapoints-increment-11 datapoints-increment-44 datapoints-write-3 > > > > datapoints-increment-12 datapoints-increment-45 datapoints-write-30 > > > > datapoints-increment-13 datapoints-increment-46 datapoints-write-31 > > > > datapoints-increment-14 datapoints-increment-47 datapoints-write-32 > > > > datapoints-increment-15 datapoints-increment-48 datapoints-write-33 > > > > datapoints-increment-16 datapoints-increment-49 datapoints-write-34 > > > > datapoints-increment-17 datapoints-increment-5 datapoints-write-35 > > > > datapoints-increment-18 datapoints-increment-6 datapoints-write-36 > > > > datapoints-increment-19 datapoints-increment-7 datapoints-write-37 > > > > datapoints-increment-2 datapoints-increment-8 datapoints-write-38 > > > > datapoints-increment-20 datapoints-increment-9 datapoints-write-39 > > > > datapoints-increment-21 datapoints-wr datapoints-write-4 > > > > datapoints-increment-22 datapoints-writ datapoints-write-40 > > > > datapoints-increment-23 datapoints-write-0 datapoints-write-41 > > > > datapoints-increment-24 datapoints-write-1 datapoints-write-42 > > > > datapoints-increment-25 datapoints-write-10 datapoints-write-43 > > > > datapoints-increment-26 datapoints-write-11 datapoints-write-44 > > > > datapoints-increment-27 datapoints-write-12 datapoints-write-45 > > > > datapoints-increment-28 datapoints-write-13 datapoints-write-46 > > > > datapoints-increment-29 datapoints-write-14 datapoints-write-47 > > > > datapoints-increment-3 datapoints-write-15 datapoints-write-48 > > > > datapoints-increment-30 datapoints-write-16 datapoints-write-49 > > > > datapoints-increment-31 datapoints-write-17 datapoints-write-5 > > > > datapoints-increment-32 datapoints-write-18 datapoints-write-6 > > > > datapoints-increment-33 datapoints-write-19 datapoints-write-7 > > > > datapoints-increment-34 datapoints-write-2 datapoints-write-8 > > > > datapoints-increment-35 datapoints-write-20 datapoints-write-9 > > > > datapoints-increment-36 datapoints-write-21 series-index-0 > > > > datapoints-increment-37 datapoints-write-22 series-index-1 > > > > datapoints-increment-38 datapoints-write-23 series-index-2 > > > > datapoints-increment-39 datapoints-write-24 series-index-3 > > > > datapoints-increment-4 datapoints-write-25 series-index-4 > > > > [blake@kafka03 kafka]$ cd datapoints-wr > > > > [blake@kafka03 datapoints-wr]$ ls -lah > > > > total 8.0K > > > > drwxr-xr-x 2 kafka kafka 4.0K Jul 11 18:51 . > > > > drwxr-xr-x 110 kafka kafka 4.0K Jul 11 19:05 .. > > > > -rw-r--r-- 1 kafka kafka 0 Jul 11 18:51 > 00000000000000000000.kafka > > > > > > > > > > > > > > > > [2] In our consumer logs: > > > > > > > > 013-07-11 21:33:01,136 61867 [FetchRunnable-0] ERROR > > > > kafka.consumer.FetcherRunnable - error in FetcherRunnable > > > > kafka.message.InvalidMessageException: message is invalid, > compression > > > > codec: NoCompressionCodec size: 79 curr offset: 93706191248 init > > > > offset: 93706191165 > > > > at > > > > > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) > > > > at > > > > > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160) > > > > at > > > > > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) > > > > at > > > > > > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > > > > at > > > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > > > > at > > > > > > > > > > kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64) > > > > at > > > > > > > > > > kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59) > > > > at > > > > > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57) > > > > at > > > > > > > > > > kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79) > > > > at > > > > > > > > > > kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:65) > > > > at > > > > > > > > > > scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59) > > > > at scala.collection.immutable.List.foreach(List.scala:76) > > > > at > kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65) > > > > > > > > > >