1. The corrupt directory names is not something we can really handle. The
names have to work with zookeeper so if weird characters get inserted I
think the best we can do is give an error. I believe we have fixed the 0
byte file problem in 0.8.
2. The assumption for log recovery is that data that has been flushed to
disk is stable, but data which has not been flushed can be in any state
(garbage bytes, missing, partial writes, etc). Recovery runs only on the
last segment of unflushed data (e.g. the file with the highest number) and
only in the case of an unclean shutdown. This is basically an
optimization--if we recover all the log on startup startup can take hours
for large logs (e.g. 5TB at say 10MB/sec). From what you are saying it
sounds like you had disk corruption prior to the last segment--in other
words flushed data became corrupt. This is not handled in 0.7. In 0.8 you
would have the option of just deleting the problematic data and restoring
from replicas.

-Jay


On Tue, Jul 16, 2013 at 1:10 PM, Blake Smith <blake.sm...@tempo-db.com>wrote:

> Hi everyone,
>
> Last week, one of our production Kafka 0.7.1 servers had a hardware failure
> that resulted in an unclean restart. When the server came back up 5 minutes
> later, there were two topic corruption problems that we had to handle to
> get the pipeline working again.
>
> 1. The kafka log directory had malformed partition directory names [1]. The
> partition directories were shortened names of our topic naming scheme.
> Inside some of these directories was a 0 byte .kafka file. This prevented
> the kafka server process from starting. To resolve this, we manually
> removed the malformed topic directories and the empty log files.
>
> Is this a known issue? If so, has it been addressed in 0.8? This seems like
> a bug.
>
> 2. One of the partitions had a corrupt log message (crc32 check failure)
> that prevented consumers from advancing on that partition [2]. To resolve
> this issue we had to hexdump the log file seeking to the byte offset
> provided by the consumer exception and remove the corrupt message at the
> frame boundary. After removing the corrupt message, we created a task to
> parse the log file and put it back into our producer / consumer pipeline.
> The InvalidMessageException never bubbled to our consumer thread,
> preventing us from handling this error without manual manipulation of the
> log files (our consumer code was catching InvalidMessageException, but it
> never reached that point).
>
> Is a corrupt log file a state that Kafka is supposed to handle, or is our
> consumer code supposed to handle it? I'm confused about how we can avoid
> manually cleaning up log files with a hex editor if we get into this state
> again.
>
> Thanks!
>
> Blake
>
> [1] Corrupt partition directories in the broker data directories:
>
> 0    [main] INFO  kafka.utils.Utils$  - The number of partitions for
> topic  datapoints-write : 50
> 1    [main] INFO  kafka.utils.Utils$  - The number of partitions for
> topic  datapoints-increment : 50
> 2    [main] INFO  kafka.utils.Utils$  - The number of partitions for
> topic  series-index : 5
> 5    [main] INFO  kafka.server.KafkaServer  - Starting Kafka server...
> 17   [main] INFO  kafka.log.LogManager  - Loading log 'datapoints-write-18'
> 32   [main] INFO  kafka.log.Log  - Loading the last segment
> /var/kafka/datapoints-write-18/00000000095029068903.kafka in mutable
> mode, recovery false
> 46   [main] INFO  kafka.log.LogManager  - Loading log
> 'datapoints-increment-15'
> 46   [main] INFO  kafka.log.Log  - Loading the last segment
> /var/kafka/datapoints-increment-15/00000000000000000000.kafka in
> mutable mode, recovery false
> 47   [main] INFO  kafka.log.LogManager  - Loading log
> 'datapoints-increment-25'
> 47   [main] INFO  kafka.log.Log  - Loading the last segment
> /var/kafka/datapoints-increment-25/00000000000000000000.kafka in
> mutable mode, recovery false
> 47   [main] INFO  kafka.log.LogManager  - Loading log 'datapoints-write-27'
> 48   [main] INFO  kafka.log.Log  - Loading the last segment
> /var/kafka/datapoints-write-27/00000000093417873484.kafka in mutable
> mode, recovery false
> 48   [main] INFO  kafka.log.LogManager  - Loading log 'datapoints-write-9'
> 48   [main] INFO  kafka.log.Log  - Loading the last segment
> /var/kafka/datapoints-write-9/00000000092880448733.kafka in mutable
> mode, recovery false
> 49   [main] INFO  kafka.log.LogManager  - Loading log
> 'datapoints-increment-8'
> 49   [main] INFO  kafka.log.Log  - Loading the last segment
> /var/kafka/datapoints-increment-8/00000000000000000000.kafka in
> mutable mode, recovery false
> 49   [main] INFO  kafka.log.LogManager  - Loading log
> 'datapoints-increment-29'
> 49   [main] INFO  kafka.log.Log  - Loading the last segment
> /var/kafka/datapoints-increment-29/00000000008053064625.kafka in
> mutable mode, recovery false
> 50   [main] INFO  kafka.log.LogManager  - Loading log
> 'datapoints-increment-34'
> 50   [main] INFO  kafka.log.Log  - Loading the last segment
> /var/kafka/datapoints-increment-34/00000000000000000000.kafka in
> mutable mode, recovery false
> 50   [main] INFO  kafka.log.LogManager  - Loading log
> 'datapoints-increment-19'
> 51   [main] INFO  kafka.log.Log  - Loading the last segment
> /var/kafka/datapoints-increment-19/00000000000000000000.kafka in
> mutable mode, recovery false
> 51   [main] INFO  kafka.log.LogManager  - Loading log
> 'datapoints-increment-16'
> 51   [main] INFO  kafka.log.Log  - Loading the last segment
> /var/kafka/datapoints-increment-16/00000000000000000000.kafka in
> mutable mode, recovery false
> 51   [main] INFO  kafka.log.LogManager  - Loading log
> 'datapoints-increment-27'
> 52   [main] INFO  kafka.log.Log  - Loading the last segment
> /var/kafka/datapoints-increment-27/00000000000000000000.kafka in
> mutable mode, recovery false
> 52   [main] INFO  kafka.log.LogManager  - Loading log 'datapoints-write-34'
> 52   [main] INFO  kafka.log.Log  - Loading the last segment
> /var/kafka/datapoints-write-34/00000000092880904403.kafka in mutable
> mode, recovery false
> 53   [main] INFO  kafka.log.LogManager  - Loading log 'datapoints-write-21'
> 53   [main] INFO  kafka.log.Log  - Loading the last segment
> /var/kafka/datapoints-write-21/00000000095028969469.kafka in mutable
> mode, recovery false
> 53   [main] INFO  kafka.log.LogManager  - Loading log 'datapoints-write-48'
> 54   [main] INFO  kafka.log.Log  - Loading the last segment
> /var/kafka/datapoints-write-48/00000000094491048876.kafka in mutable
> mode, recovery false
> 54   [main] INFO  kafka.log.LogManager  - Loading log
> 'datapoints-increment-40'
> 55   [main] INFO  kafka.log.Log  - Loading the last segment
> /var/kafka/datapoints-increment-40/00000000000000000000.kafka in
> mutable mode, recovery false
> 55   [main] INFO  kafka.log.LogManager  - Loading log
> 'datapoints-increment-6'
> 55   [main] INFO  kafka.log.Log  - Loading the last segment
> /var/kafka/datapoints-increment-6/00000000000000000000.kafka in
> mutable mode, recovery false
> 56   [main] INFO  kafka.log.LogManager  - Loading log 'datapoints-write-25'
> 56   [main] INFO  kafka.log.Log  - Loading the last segment
> /var/kafka/datapoints-write-25/00000000093417394574.kafka in mutable
> mode, recovery false
> 56   [main] INFO  kafka.log.LogManager  - Loading log 'datapoints-write-33'
> 57   [main] INFO  kafka.log.Log  - Loading the last segment
> /var/kafka/datapoints-write-33/00000000093417473922.kafka in mutable
> mode, recovery false
> 57   [main] INFO  kafka.log.LogManager  - Loading log 'datapoints-write-35'
> 58   [main] INFO  kafka.log.Log  - Loading the last segment
> /var/kafka/datapoints-write-35/00000000092880891040.kafka in mutable
> mode, recovery false
> 58   [main] INFO  kafka.log.LogManager  - Loading log 'datapoints-wr'
> 58   [main] INFO  kafka.log.Log  - Loading the last segment
> /var/kafka/datapoints-wr/00000000000000000000.kafka in mutable mode,
> recovery false
> 60   [main] FATAL kafka.server.KafkaServerStartable  - Fatal error
> during KafkaServerStable startup. Prepare to shutdown
> java.lang.NumberFormatException: For input string: "wr"
>         at
> java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
>         at java.lang.Integer.parseInt(Integer.java:481)
>         at java.lang.Integer.parseInt(Integer.java:514)
>         at
> scala.collection.immutable.StringLike$class.toInt(StringLike.scala:207)
>         at scala.collection.immutable.StringOps.toInt(StringOps.scala:31)
>         at kafka.utils.Utils$.getTopicPartition(Utils.scala:558)
>         at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:71)
>         at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:65)
>         at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
>         at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
>         at kafka.log.LogManager.<init>(LogManager.scala:65)
>         at kafka.server.KafkaServer.startup(KafkaServer.scala:58)
>         at
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
>         at kafka.Kafka$.main(Kafka.scala:50)
>         at kafka.Kafka.main(Kafka.scala)
> 61   [main] INFO  kafka.server.KafkaServer  - Shutting down Kafka server
> 62   [main] INFO  kafka.utils.KafkaScheduler  - shutdown scheduler
> kafka-logcleaner-
> 62   [main] INFO  kafka.server.KafkaServer  - Kafka server shut down
> completed
>
> [blake@kafka03 kafka]$ ls
> dat                      datapoints-increment-40  datapoints-write-26
> datapoints-increment-0   datapoints-increment-41  datapoints-write-27
> datapoints-increment-1   datapoints-increment-42  datapoints-write-28
> datapoints-increment-10  datapoints-increment-43  datapoints-write-29
> datapoints-increment-11  datapoints-increment-44  datapoints-write-3
> datapoints-increment-12  datapoints-increment-45  datapoints-write-30
> datapoints-increment-13  datapoints-increment-46  datapoints-write-31
> datapoints-increment-14  datapoints-increment-47  datapoints-write-32
> datapoints-increment-15  datapoints-increment-48  datapoints-write-33
> datapoints-increment-16  datapoints-increment-49  datapoints-write-34
> datapoints-increment-17  datapoints-increment-5   datapoints-write-35
> datapoints-increment-18  datapoints-increment-6   datapoints-write-36
> datapoints-increment-19  datapoints-increment-7   datapoints-write-37
> datapoints-increment-2   datapoints-increment-8   datapoints-write-38
> datapoints-increment-20  datapoints-increment-9   datapoints-write-39
> datapoints-increment-21  datapoints-wr            datapoints-write-4
> datapoints-increment-22  datapoints-writ          datapoints-write-40
> datapoints-increment-23  datapoints-write-0       datapoints-write-41
> datapoints-increment-24  datapoints-write-1       datapoints-write-42
> datapoints-increment-25  datapoints-write-10      datapoints-write-43
> datapoints-increment-26  datapoints-write-11      datapoints-write-44
> datapoints-increment-27  datapoints-write-12      datapoints-write-45
> datapoints-increment-28  datapoints-write-13      datapoints-write-46
> datapoints-increment-29  datapoints-write-14      datapoints-write-47
> datapoints-increment-3   datapoints-write-15      datapoints-write-48
> datapoints-increment-30  datapoints-write-16      datapoints-write-49
> datapoints-increment-31  datapoints-write-17      datapoints-write-5
> datapoints-increment-32  datapoints-write-18      datapoints-write-6
> datapoints-increment-33  datapoints-write-19      datapoints-write-7
> datapoints-increment-34  datapoints-write-2       datapoints-write-8
> datapoints-increment-35  datapoints-write-20      datapoints-write-9
> datapoints-increment-36  datapoints-write-21      series-index-0
> datapoints-increment-37  datapoints-write-22      series-index-1
> datapoints-increment-38  datapoints-write-23      series-index-2
> datapoints-increment-39  datapoints-write-24      series-index-3
> datapoints-increment-4   datapoints-write-25      series-index-4
> [blake@kafka03 kafka]$ cd datapoints-wr
> [blake@kafka03 datapoints-wr]$ ls -lah
> total 8.0K
> drwxr-xr-x   2 kafka kafka 4.0K Jul 11 18:51 .
> drwxr-xr-x 110 kafka kafka 4.0K Jul 11 19:05 ..
> -rw-r--r--   1 kafka kafka    0 Jul 11 18:51 00000000000000000000.kafka
>
>
>
> [2] In our consumer logs:
>
> 013-07-11 21:33:01,136 61867 [FetchRunnable-0] ERROR
> kafka.consumer.FetcherRunnable  - error in FetcherRunnable
> kafka.message.InvalidMessageException: message is invalid, compression
> codec: NoCompressionCodec size: 79 curr offset: 93706191248 init
> offset: 93706191165
>         at
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
>         at
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160)
>         at
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
>         at
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>         at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>         at
> kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64)
>         at
> kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59)
>         at
> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57)
>         at
> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79)
>         at
> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:65)
>         at
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>         at scala.collection.immutable.List.foreach(List.scala:76)
>         at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65)
>

Reply via email to