Just to close the loop here for posterity: 1. For the directory topic name corruption, it looks like there's still an outstanding issue in JIRA: https://issues.apache.org/jira/browse/KAFKA-411 2. Ensuring log recovery is run seems to be fixed in commit 75fc5eab35aa33cffd9c09a2070dfe287db0ef4e ( https://issues.apache.org/jira/browse/KAFKA-188)
Thanks for your help Jay, it looks like we'll have to start the 0.7.1 -> 0.8 upgrade process sooner than I thought. On Wed, Jul 17, 2013 at 12:32 PM, Blake Smith <blake.sm...@tempo-db.com>wrote: > 1. Cool. For now I'll write this off as a fluke side-effect of a unsafe > hardware shutdown. If I come up with a reproducible test case, I'll send it > (or a patch) your way. > > 2. After the process came up after the crash, recovery tried to run on > each partition, but because of the malformed directory issue it looks like > recovery never ran on partition 11 (the log with the corrupt message). > > 40472 [main] INFO kafka.log.Log - Loading the last segment > /var/kafka/datapoints-write-33/00000000093417473922.kafka in mutable mode, > recovery true > 44032 [main] INFO kafka.message.FileMessageSet - recover high water > mark:199897864 > 44032 [main] INFO kafka.message.FileMessageSet - Recovery succeeded in 3 > seconds. 0 bytes truncated. > 44033 [main] INFO kafka.log.LogManager - Loading log > 'datapoints-write-35' > 44044 [main] INFO kafka.log.Log - Loading the last segment > /var/kafka/datapoints-write-35/00000000092880891040.kafka in mutable mode, > recovery true > 48836 [main] INFO kafka.message.FileMessageSet - recover high water > mark:268123698 > 48836 [main] INFO kafka.message.FileMessageSet - Recovery succeeded in 4 > seconds. 0 bytes truncated. > 48859 [main] INFO kafka.log.LogManager - Loading log 'datapoints-wr' > 48911 [main] FATAL kafka.server.KafkaServerStartable - Fatal error during > KafkaServerStable startup. Prepare to shutdown > java.lang.NumberFormatException: For input string: "wr" > at > java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) > at java.lang.Integer.parseInt(Integer.java:481) > at java.lang.Integer.parseInt(Integer.java:514) > at > scala.collection.immutable.StringLike$class.toInt(StringLike.scala:207) > at scala.collection.immutable.StringOps.toInt(StringOps.scala:31) > at kafka.utils.Utils$.getTopicPartition(Utils.scala:558) > at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:71) > at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:65) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) > at kafka.log.LogManager.<init>(LogManager.scala:65) > at kafka.server.KafkaServer.startup(KafkaServer.scala:58) > at > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34) > at kafka.Kafka$.main(Kafka.scala:50) > at kafka.Kafka.main(Kafka.scala) > 48912 [main] INFO kafka.server.KafkaServer - Shutting down Kafka server > 48913 [main] INFO kafka.utils.KafkaScheduler - shutdown scheduler > kafka-logcleaner- > 48913 [main] INFO kafka.server.KafkaServer - Kafka server shut down > completed > 0 [main] INFO kafka.utils.Utils$ - The number of partitions for topic > datapoints-write : 50 > 2 [main] INFO kafka.utils.Utils$ - The number of partitions for topic > datapoints-increment : 50 > 2 [main] INFO kafka.utils.Utils$ - The number of partitions for topic > series-index : 5 > 5 [main] INFO kafka.server.KafkaServer - Starting Kafka server... > 17 [main] INFO kafka.log.LogManager - Loading log 'datapoints-write-18' > 32 [main] INFO kafka.log.Log - Loading the last segment > /var/kafka/datapoints-write-18/00000000095029068903.kafka in mutable mode, > recovery false > 46 [main] INFO kafka.log.LogManager - Loading log > 'datapoints-increment-15' > 46 [main] INFO kafka.log.Log - Loading the last segment > /var/kafka/datapoints-increment-15/00000000000000000000.kafka in mutable > mode, recovery false > 47 [main] INFO kafka.log.LogManager - Loading log > 'datapoints-increment-25' > 47 [main] INFO kafka.log.Log - Loading the last segment > /var/kafka/datapoints-increment-25/00000000000000000000.kafka in mutable > mode, recovery false > 47 [main] INFO kafka.log.LogManager - Loading log 'datapoints-write-27' > 48 [main] INFO kafka.log.Log - Loading the last segment > /var/kafka/datapoints-write-27/00000000093417873484.kafka in mutable mode, > recovery false > > How is recovery state tracked? From the behavior I'm seeing, it looks like > it is only triggered on the first restart after an unclean shutdown. > > Blake > > > On Tue, Jul 16, 2013 at 6:52 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > >> 1. I don't think this is something kafka can do. We never rename >> directories once we create them. I'm a little at a loss as to how/why the >> os would do it either, though. >> 2. Can you check if it ran recovery or not (you would see a bunch of >> messages about recovering each topic). This process goes through each >> message sequentially and checks the CRC. If that did happen it is unlikely >> that there was a problem in the log itself. >> >> -Jay >> >> >> On Tue, Jul 16, 2013 at 3:04 PM, Blake Smith <blake.sm...@tempo-db.com >> >wrote: >> >> > Thanks for the response Jay, >> > >> > 1. Do you think the corrupt partition directory names is happening at >> the >> > kafka level, or the file system level? If it's at the file system level, >> > perhaps we can investigate either changing FS options or selecting a >> > different filesystem for the log files. Is there a recommended >> filesystem / >> > filesystem options? >> > >> > 2. I'm a little confused about terminology, but from what I observed the >> > disk corruption was in the last segment for the partition. At the time I >> > investigated last segment file for the bad partition was ~300mb large >> and >> > the corrupt message was around around 288mb byte offset (Our segment >> files >> > seem to get rotated at 512mb, which I believe is the default). When the >> > server came back up, the corrupt last segment file was still being >> written >> > to beyond the corrupt message. Is this the behavior you're describing? >> > >> > Blake >> > >> > >> > On Tue, Jul 16, 2013 at 3:29 PM, Jay Kreps <jay.kr...@gmail.com> wrote: >> > >> > > 1. The corrupt directory names is not something we can really handle. >> The >> > > names have to work with zookeeper so if weird characters get inserted >> I >> > > think the best we can do is give an error. I believe we have fixed >> the 0 >> > > byte file problem in 0.8. >> > > 2. The assumption for log recovery is that data that has been flushed >> to >> > > disk is stable, but data which has not been flushed can be in any >> state >> > > (garbage bytes, missing, partial writes, etc). Recovery runs only on >> the >> > > last segment of unflushed data (e.g. the file with the highest number) >> > and >> > > only in the case of an unclean shutdown. This is basically an >> > > optimization--if we recover all the log on startup startup can take >> hours >> > > for large logs (e.g. 5TB at say 10MB/sec). From what you are saying it >> > > sounds like you had disk corruption prior to the last segment--in >> other >> > > words flushed data became corrupt. This is not handled in 0.7. In 0.8 >> you >> > > would have the option of just deleting the problematic data and >> restoring >> > > from replicas. >> > > >> > > -Jay >> > > >> > > >> > > On Tue, Jul 16, 2013 at 1:10 PM, Blake Smith < >> blake.sm...@tempo-db.com >> > > >wrote: >> > > >> > > > Hi everyone, >> > > > >> > > > Last week, one of our production Kafka 0.7.1 servers had a hardware >> > > failure >> > > > that resulted in an unclean restart. When the server came back up 5 >> > > minutes >> > > > later, there were two topic corruption problems that we had to >> handle >> > to >> > > > get the pipeline working again. >> > > > >> > > > 1. The kafka log directory had malformed partition directory names >> [1]. >> > > The >> > > > partition directories were shortened names of our topic naming >> scheme. >> > > > Inside some of these directories was a 0 byte .kafka file. This >> > prevented >> > > > the kafka server process from starting. To resolve this, we manually >> > > > removed the malformed topic directories and the empty log files. >> > > > >> > > > Is this a known issue? If so, has it been addressed in 0.8? This >> seems >> > > like >> > > > a bug. >> > > > >> > > > 2. One of the partitions had a corrupt log message (crc32 check >> > failure) >> > > > that prevented consumers from advancing on that partition [2]. To >> > resolve >> > > > this issue we had to hexdump the log file seeking to the byte offset >> > > > provided by the consumer exception and remove the corrupt message at >> > the >> > > > frame boundary. After removing the corrupt message, we created a >> task >> > to >> > > > parse the log file and put it back into our producer / consumer >> > pipeline. >> > > > The InvalidMessageException never bubbled to our consumer thread, >> > > > preventing us from handling this error without manual manipulation >> of >> > the >> > > > log files (our consumer code was catching InvalidMessageException, >> but >> > it >> > > > never reached that point). >> > > > >> > > > Is a corrupt log file a state that Kafka is supposed to handle, or >> is >> > our >> > > > consumer code supposed to handle it? I'm confused about how we can >> > avoid >> > > > manually cleaning up log files with a hex editor if we get into this >> > > state >> > > > again. >> > > > >> > > > Thanks! >> > > > >> > > > Blake >> > > > >> > > > [1] Corrupt partition directories in the broker data directories: >> > > > >> > > > 0 [main] INFO kafka.utils.Utils$ - The number of partitions for >> > > > topic datapoints-write : 50 >> > > > 1 [main] INFO kafka.utils.Utils$ - The number of partitions for >> > > > topic datapoints-increment : 50 >> > > > 2 [main] INFO kafka.utils.Utils$ - The number of partitions for >> > > > topic series-index : 5 >> > > > 5 [main] INFO kafka.server.KafkaServer - Starting Kafka >> server... >> > > > 17 [main] INFO kafka.log.LogManager - Loading log >> > > 'datapoints-write-18' >> > > > 32 [main] INFO kafka.log.Log - Loading the last segment >> > > > /var/kafka/datapoints-write-18/00000000095029068903.kafka in mutable >> > > > mode, recovery false >> > > > 46 [main] INFO kafka.log.LogManager - Loading log >> > > > 'datapoints-increment-15' >> > > > 46 [main] INFO kafka.log.Log - Loading the last segment >> > > > /var/kafka/datapoints-increment-15/00000000000000000000.kafka in >> > > > mutable mode, recovery false >> > > > 47 [main] INFO kafka.log.LogManager - Loading log >> > > > 'datapoints-increment-25' >> > > > 47 [main] INFO kafka.log.Log - Loading the last segment >> > > > /var/kafka/datapoints-increment-25/00000000000000000000.kafka in >> > > > mutable mode, recovery false >> > > > 47 [main] INFO kafka.log.LogManager - Loading log >> > > 'datapoints-write-27' >> > > > 48 [main] INFO kafka.log.Log - Loading the last segment >> > > > /var/kafka/datapoints-write-27/00000000093417873484.kafka in mutable >> > > > mode, recovery false >> > > > 48 [main] INFO kafka.log.LogManager - Loading log >> > > 'datapoints-write-9' >> > > > 48 [main] INFO kafka.log.Log - Loading the last segment >> > > > /var/kafka/datapoints-write-9/00000000092880448733.kafka in mutable >> > > > mode, recovery false >> > > > 49 [main] INFO kafka.log.LogManager - Loading log >> > > > 'datapoints-increment-8' >> > > > 49 [main] INFO kafka.log.Log - Loading the last segment >> > > > /var/kafka/datapoints-increment-8/00000000000000000000.kafka in >> > > > mutable mode, recovery false >> > > > 49 [main] INFO kafka.log.LogManager - Loading log >> > > > 'datapoints-increment-29' >> > > > 49 [main] INFO kafka.log.Log - Loading the last segment >> > > > /var/kafka/datapoints-increment-29/00000000008053064625.kafka in >> > > > mutable mode, recovery false >> > > > 50 [main] INFO kafka.log.LogManager - Loading log >> > > > 'datapoints-increment-34' >> > > > 50 [main] INFO kafka.log.Log - Loading the last segment >> > > > /var/kafka/datapoints-increment-34/00000000000000000000.kafka in >> > > > mutable mode, recovery false >> > > > 50 [main] INFO kafka.log.LogManager - Loading log >> > > > 'datapoints-increment-19' >> > > > 51 [main] INFO kafka.log.Log - Loading the last segment >> > > > /var/kafka/datapoints-increment-19/00000000000000000000.kafka in >> > > > mutable mode, recovery false >> > > > 51 [main] INFO kafka.log.LogManager - Loading log >> > > > 'datapoints-increment-16' >> > > > 51 [main] INFO kafka.log.Log - Loading the last segment >> > > > /var/kafka/datapoints-increment-16/00000000000000000000.kafka in >> > > > mutable mode, recovery false >> > > > 51 [main] INFO kafka.log.LogManager - Loading log >> > > > 'datapoints-increment-27' >> > > > 52 [main] INFO kafka.log.Log - Loading the last segment >> > > > /var/kafka/datapoints-increment-27/00000000000000000000.kafka in >> > > > mutable mode, recovery false >> > > > 52 [main] INFO kafka.log.LogManager - Loading log >> > > 'datapoints-write-34' >> > > > 52 [main] INFO kafka.log.Log - Loading the last segment >> > > > /var/kafka/datapoints-write-34/00000000092880904403.kafka in mutable >> > > > mode, recovery false >> > > > 53 [main] INFO kafka.log.LogManager - Loading log >> > > 'datapoints-write-21' >> > > > 53 [main] INFO kafka.log.Log - Loading the last segment >> > > > /var/kafka/datapoints-write-21/00000000095028969469.kafka in mutable >> > > > mode, recovery false >> > > > 53 [main] INFO kafka.log.LogManager - Loading log >> > > 'datapoints-write-48' >> > > > 54 [main] INFO kafka.log.Log - Loading the last segment >> > > > /var/kafka/datapoints-write-48/00000000094491048876.kafka in mutable >> > > > mode, recovery false >> > > > 54 [main] INFO kafka.log.LogManager - Loading log >> > > > 'datapoints-increment-40' >> > > > 55 [main] INFO kafka.log.Log - Loading the last segment >> > > > /var/kafka/datapoints-increment-40/00000000000000000000.kafka in >> > > > mutable mode, recovery false >> > > > 55 [main] INFO kafka.log.LogManager - Loading log >> > > > 'datapoints-increment-6' >> > > > 55 [main] INFO kafka.log.Log - Loading the last segment >> > > > /var/kafka/datapoints-increment-6/00000000000000000000.kafka in >> > > > mutable mode, recovery false >> > > > 56 [main] INFO kafka.log.LogManager - Loading log >> > > 'datapoints-write-25' >> > > > 56 [main] INFO kafka.log.Log - Loading the last segment >> > > > /var/kafka/datapoints-write-25/00000000093417394574.kafka in mutable >> > > > mode, recovery false >> > > > 56 [main] INFO kafka.log.LogManager - Loading log >> > > 'datapoints-write-33' >> > > > 57 [main] INFO kafka.log.Log - Loading the last segment >> > > > /var/kafka/datapoints-write-33/00000000093417473922.kafka in mutable >> > > > mode, recovery false >> > > > 57 [main] INFO kafka.log.LogManager - Loading log >> > > 'datapoints-write-35' >> > > > 58 [main] INFO kafka.log.Log - Loading the last segment >> > > > /var/kafka/datapoints-write-35/00000000092880891040.kafka in mutable >> > > > mode, recovery false >> > > > 58 [main] INFO kafka.log.LogManager - Loading log >> 'datapoints-wr' >> > > > 58 [main] INFO kafka.log.Log - Loading the last segment >> > > > /var/kafka/datapoints-wr/00000000000000000000.kafka in mutable mode, >> > > > recovery false >> > > > 60 [main] FATAL kafka.server.KafkaServerStartable - Fatal error >> > > > during KafkaServerStable startup. Prepare to shutdown >> > > > java.lang.NumberFormatException: For input string: "wr" >> > > > at >> > > > >> > > >> > >> java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) >> > > > at java.lang.Integer.parseInt(Integer.java:481) >> > > > at java.lang.Integer.parseInt(Integer.java:514) >> > > > at >> > > > >> scala.collection.immutable.StringLike$class.toInt(StringLike.scala:207) >> > > > at >> > scala.collection.immutable.StringOps.toInt(StringOps.scala:31) >> > > > at kafka.utils.Utils$.getTopicPartition(Utils.scala:558) >> > > > at >> kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:71) >> > > > at >> kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:65) >> > > > at >> > > > >> > > >> > >> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) >> > > > at >> scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) >> > > > at kafka.log.LogManager.<init>(LogManager.scala:65) >> > > > at kafka.server.KafkaServer.startup(KafkaServer.scala:58) >> > > > at >> > > > >> > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34) >> > > > at kafka.Kafka$.main(Kafka.scala:50) >> > > > at kafka.Kafka.main(Kafka.scala) >> > > > 61 [main] INFO kafka.server.KafkaServer - Shutting down Kafka >> > server >> > > > 62 [main] INFO kafka.utils.KafkaScheduler - shutdown scheduler >> > > > kafka-logcleaner- >> > > > 62 [main] INFO kafka.server.KafkaServer - Kafka server shut down >> > > > completed >> > > > >> > > > [blake@kafka03 kafka]$ ls >> > > > dat datapoints-increment-40 >> datapoints-write-26 >> > > > datapoints-increment-0 datapoints-increment-41 >> datapoints-write-27 >> > > > datapoints-increment-1 datapoints-increment-42 >> datapoints-write-28 >> > > > datapoints-increment-10 datapoints-increment-43 >> datapoints-write-29 >> > > > datapoints-increment-11 datapoints-increment-44 datapoints-write-3 >> > > > datapoints-increment-12 datapoints-increment-45 >> datapoints-write-30 >> > > > datapoints-increment-13 datapoints-increment-46 >> datapoints-write-31 >> > > > datapoints-increment-14 datapoints-increment-47 >> datapoints-write-32 >> > > > datapoints-increment-15 datapoints-increment-48 >> datapoints-write-33 >> > > > datapoints-increment-16 datapoints-increment-49 >> datapoints-write-34 >> > > > datapoints-increment-17 datapoints-increment-5 >> datapoints-write-35 >> > > > datapoints-increment-18 datapoints-increment-6 >> datapoints-write-36 >> > > > datapoints-increment-19 datapoints-increment-7 >> datapoints-write-37 >> > > > datapoints-increment-2 datapoints-increment-8 >> datapoints-write-38 >> > > > datapoints-increment-20 datapoints-increment-9 >> datapoints-write-39 >> > > > datapoints-increment-21 datapoints-wr datapoints-write-4 >> > > > datapoints-increment-22 datapoints-writ >> datapoints-write-40 >> > > > datapoints-increment-23 datapoints-write-0 >> datapoints-write-41 >> > > > datapoints-increment-24 datapoints-write-1 >> datapoints-write-42 >> > > > datapoints-increment-25 datapoints-write-10 >> datapoints-write-43 >> > > > datapoints-increment-26 datapoints-write-11 >> datapoints-write-44 >> > > > datapoints-increment-27 datapoints-write-12 >> datapoints-write-45 >> > > > datapoints-increment-28 datapoints-write-13 >> datapoints-write-46 >> > > > datapoints-increment-29 datapoints-write-14 >> datapoints-write-47 >> > > > datapoints-increment-3 datapoints-write-15 >> datapoints-write-48 >> > > > datapoints-increment-30 datapoints-write-16 >> datapoints-write-49 >> > > > datapoints-increment-31 datapoints-write-17 datapoints-write-5 >> > > > datapoints-increment-32 datapoints-write-18 datapoints-write-6 >> > > > datapoints-increment-33 datapoints-write-19 datapoints-write-7 >> > > > datapoints-increment-34 datapoints-write-2 datapoints-write-8 >> > > > datapoints-increment-35 datapoints-write-20 datapoints-write-9 >> > > > datapoints-increment-36 datapoints-write-21 series-index-0 >> > > > datapoints-increment-37 datapoints-write-22 series-index-1 >> > > > datapoints-increment-38 datapoints-write-23 series-index-2 >> > > > datapoints-increment-39 datapoints-write-24 series-index-3 >> > > > datapoints-increment-4 datapoints-write-25 series-index-4 >> > > > [blake@kafka03 kafka]$ cd datapoints-wr >> > > > [blake@kafka03 datapoints-wr]$ ls -lah >> > > > total 8.0K >> > > > drwxr-xr-x 2 kafka kafka 4.0K Jul 11 18:51 . >> > > > drwxr-xr-x 110 kafka kafka 4.0K Jul 11 19:05 .. >> > > > -rw-r--r-- 1 kafka kafka 0 Jul 11 18:51 >> 00000000000000000000.kafka >> > > > >> > > > >> > > > >> > > > [2] In our consumer logs: >> > > > >> > > > 013-07-11 21:33:01,136 61867 [FetchRunnable-0] ERROR >> > > > kafka.consumer.FetcherRunnable - error in FetcherRunnable >> > > > kafka.message.InvalidMessageException: message is invalid, >> compression >> > > > codec: NoCompressionCodec size: 79 curr offset: 93706191248 init >> > > > offset: 93706191165 >> > > > at >> > > > >> > > >> > >> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) >> > > > at >> > > > >> > > >> > >> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160) >> > > > at >> > > > >> > > >> > >> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) >> > > > at >> > > > >> > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) >> > > > at >> > > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) >> > > > at >> > > > >> > > >> > >> kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64) >> > > > at >> > > > >> > > >> > >> kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59) >> > > > at >> > > > >> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57) >> > > > at >> > > > >> > > >> > >> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79) >> > > > at >> > > > >> > > >> > >> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:65) >> > > > at >> > > > >> > > >> > >> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59) >> > > > at scala.collection.immutable.List.foreach(List.scala:76) >> > > > at >> kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65) >> > > > >> > > >> > >> > >