Thanks for the response Jay,

1. Do you think the corrupt partition directory names is happening at the
kafka level, or the file system level? If it's at the file system level,
perhaps we can investigate either changing FS options or selecting a
different filesystem for the log files. Is there a recommended filesystem /
filesystem options?

2. I'm a little confused about terminology, but from what I observed the
disk corruption was in the last segment for the partition. At the time I
investigated last segment file for the bad partition was ~300mb large and
the corrupt message was around around 288mb byte offset (Our segment files
seem to get rotated at 512mb, which I believe is the default). When the
server came back up, the corrupt last segment file was still being written
to beyond the corrupt message. Is this the behavior you're describing?

Blake


On Tue, Jul 16, 2013 at 3:29 PM, Jay Kreps <jay.kr...@gmail.com> wrote:

> 1. The corrupt directory names is not something we can really handle. The
> names have to work with zookeeper so if weird characters get inserted I
> think the best we can do is give an error. I believe we have fixed the 0
> byte file problem in 0.8.
> 2. The assumption for log recovery is that data that has been flushed to
> disk is stable, but data which has not been flushed can be in any state
> (garbage bytes, missing, partial writes, etc). Recovery runs only on the
> last segment of unflushed data (e.g. the file with the highest number) and
> only in the case of an unclean shutdown. This is basically an
> optimization--if we recover all the log on startup startup can take hours
> for large logs (e.g. 5TB at say 10MB/sec). From what you are saying it
> sounds like you had disk corruption prior to the last segment--in other
> words flushed data became corrupt. This is not handled in 0.7. In 0.8 you
> would have the option of just deleting the problematic data and restoring
> from replicas.
>
> -Jay
>
>
> On Tue, Jul 16, 2013 at 1:10 PM, Blake Smith <blake.sm...@tempo-db.com
> >wrote:
>
> > Hi everyone,
> >
> > Last week, one of our production Kafka 0.7.1 servers had a hardware
> failure
> > that resulted in an unclean restart. When the server came back up 5
> minutes
> > later, there were two topic corruption problems that we had to handle to
> > get the pipeline working again.
> >
> > 1. The kafka log directory had malformed partition directory names [1].
> The
> > partition directories were shortened names of our topic naming scheme.
> > Inside some of these directories was a 0 byte .kafka file. This prevented
> > the kafka server process from starting. To resolve this, we manually
> > removed the malformed topic directories and the empty log files.
> >
> > Is this a known issue? If so, has it been addressed in 0.8? This seems
> like
> > a bug.
> >
> > 2. One of the partitions had a corrupt log message (crc32 check failure)
> > that prevented consumers from advancing on that partition [2]. To resolve
> > this issue we had to hexdump the log file seeking to the byte offset
> > provided by the consumer exception and remove the corrupt message at the
> > frame boundary. After removing the corrupt message, we created a task to
> > parse the log file and put it back into our producer / consumer pipeline.
> > The InvalidMessageException never bubbled to our consumer thread,
> > preventing us from handling this error without manual manipulation of the
> > log files (our consumer code was catching InvalidMessageException, but it
> > never reached that point).
> >
> > Is a corrupt log file a state that Kafka is supposed to handle, or is our
> > consumer code supposed to handle it? I'm confused about how we can avoid
> > manually cleaning up log files with a hex editor if we get into this
> state
> > again.
> >
> > Thanks!
> >
> > Blake
> >
> > [1] Corrupt partition directories in the broker data directories:
> >
> > 0    [main] INFO  kafka.utils.Utils$  - The number of partitions for
> > topic  datapoints-write : 50
> > 1    [main] INFO  kafka.utils.Utils$  - The number of partitions for
> > topic  datapoints-increment : 50
> > 2    [main] INFO  kafka.utils.Utils$  - The number of partitions for
> > topic  series-index : 5
> > 5    [main] INFO  kafka.server.KafkaServer  - Starting Kafka server...
> > 17   [main] INFO  kafka.log.LogManager  - Loading log
> 'datapoints-write-18'
> > 32   [main] INFO  kafka.log.Log  - Loading the last segment
> > /var/kafka/datapoints-write-18/00000000095029068903.kafka in mutable
> > mode, recovery false
> > 46   [main] INFO  kafka.log.LogManager  - Loading log
> > 'datapoints-increment-15'
> > 46   [main] INFO  kafka.log.Log  - Loading the last segment
> > /var/kafka/datapoints-increment-15/00000000000000000000.kafka in
> > mutable mode, recovery false
> > 47   [main] INFO  kafka.log.LogManager  - Loading log
> > 'datapoints-increment-25'
> > 47   [main] INFO  kafka.log.Log  - Loading the last segment
> > /var/kafka/datapoints-increment-25/00000000000000000000.kafka in
> > mutable mode, recovery false
> > 47   [main] INFO  kafka.log.LogManager  - Loading log
> 'datapoints-write-27'
> > 48   [main] INFO  kafka.log.Log  - Loading the last segment
> > /var/kafka/datapoints-write-27/00000000093417873484.kafka in mutable
> > mode, recovery false
> > 48   [main] INFO  kafka.log.LogManager  - Loading log
> 'datapoints-write-9'
> > 48   [main] INFO  kafka.log.Log  - Loading the last segment
> > /var/kafka/datapoints-write-9/00000000092880448733.kafka in mutable
> > mode, recovery false
> > 49   [main] INFO  kafka.log.LogManager  - Loading log
> > 'datapoints-increment-8'
> > 49   [main] INFO  kafka.log.Log  - Loading the last segment
> > /var/kafka/datapoints-increment-8/00000000000000000000.kafka in
> > mutable mode, recovery false
> > 49   [main] INFO  kafka.log.LogManager  - Loading log
> > 'datapoints-increment-29'
> > 49   [main] INFO  kafka.log.Log  - Loading the last segment
> > /var/kafka/datapoints-increment-29/00000000008053064625.kafka in
> > mutable mode, recovery false
> > 50   [main] INFO  kafka.log.LogManager  - Loading log
> > 'datapoints-increment-34'
> > 50   [main] INFO  kafka.log.Log  - Loading the last segment
> > /var/kafka/datapoints-increment-34/00000000000000000000.kafka in
> > mutable mode, recovery false
> > 50   [main] INFO  kafka.log.LogManager  - Loading log
> > 'datapoints-increment-19'
> > 51   [main] INFO  kafka.log.Log  - Loading the last segment
> > /var/kafka/datapoints-increment-19/00000000000000000000.kafka in
> > mutable mode, recovery false
> > 51   [main] INFO  kafka.log.LogManager  - Loading log
> > 'datapoints-increment-16'
> > 51   [main] INFO  kafka.log.Log  - Loading the last segment
> > /var/kafka/datapoints-increment-16/00000000000000000000.kafka in
> > mutable mode, recovery false
> > 51   [main] INFO  kafka.log.LogManager  - Loading log
> > 'datapoints-increment-27'
> > 52   [main] INFO  kafka.log.Log  - Loading the last segment
> > /var/kafka/datapoints-increment-27/00000000000000000000.kafka in
> > mutable mode, recovery false
> > 52   [main] INFO  kafka.log.LogManager  - Loading log
> 'datapoints-write-34'
> > 52   [main] INFO  kafka.log.Log  - Loading the last segment
> > /var/kafka/datapoints-write-34/00000000092880904403.kafka in mutable
> > mode, recovery false
> > 53   [main] INFO  kafka.log.LogManager  - Loading log
> 'datapoints-write-21'
> > 53   [main] INFO  kafka.log.Log  - Loading the last segment
> > /var/kafka/datapoints-write-21/00000000095028969469.kafka in mutable
> > mode, recovery false
> > 53   [main] INFO  kafka.log.LogManager  - Loading log
> 'datapoints-write-48'
> > 54   [main] INFO  kafka.log.Log  - Loading the last segment
> > /var/kafka/datapoints-write-48/00000000094491048876.kafka in mutable
> > mode, recovery false
> > 54   [main] INFO  kafka.log.LogManager  - Loading log
> > 'datapoints-increment-40'
> > 55   [main] INFO  kafka.log.Log  - Loading the last segment
> > /var/kafka/datapoints-increment-40/00000000000000000000.kafka in
> > mutable mode, recovery false
> > 55   [main] INFO  kafka.log.LogManager  - Loading log
> > 'datapoints-increment-6'
> > 55   [main] INFO  kafka.log.Log  - Loading the last segment
> > /var/kafka/datapoints-increment-6/00000000000000000000.kafka in
> > mutable mode, recovery false
> > 56   [main] INFO  kafka.log.LogManager  - Loading log
> 'datapoints-write-25'
> > 56   [main] INFO  kafka.log.Log  - Loading the last segment
> > /var/kafka/datapoints-write-25/00000000093417394574.kafka in mutable
> > mode, recovery false
> > 56   [main] INFO  kafka.log.LogManager  - Loading log
> 'datapoints-write-33'
> > 57   [main] INFO  kafka.log.Log  - Loading the last segment
> > /var/kafka/datapoints-write-33/00000000093417473922.kafka in mutable
> > mode, recovery false
> > 57   [main] INFO  kafka.log.LogManager  - Loading log
> 'datapoints-write-35'
> > 58   [main] INFO  kafka.log.Log  - Loading the last segment
> > /var/kafka/datapoints-write-35/00000000092880891040.kafka in mutable
> > mode, recovery false
> > 58   [main] INFO  kafka.log.LogManager  - Loading log 'datapoints-wr'
> > 58   [main] INFO  kafka.log.Log  - Loading the last segment
> > /var/kafka/datapoints-wr/00000000000000000000.kafka in mutable mode,
> > recovery false
> > 60   [main] FATAL kafka.server.KafkaServerStartable  - Fatal error
> > during KafkaServerStable startup. Prepare to shutdown
> > java.lang.NumberFormatException: For input string: "wr"
> >         at
> >
> java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
> >         at java.lang.Integer.parseInt(Integer.java:481)
> >         at java.lang.Integer.parseInt(Integer.java:514)
> >         at
> > scala.collection.immutable.StringLike$class.toInt(StringLike.scala:207)
> >         at scala.collection.immutable.StringOps.toInt(StringOps.scala:31)
> >         at kafka.utils.Utils$.getTopicPartition(Utils.scala:558)
> >         at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:71)
> >         at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:65)
> >         at
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> >         at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> >         at kafka.log.LogManager.<init>(LogManager.scala:65)
> >         at kafka.server.KafkaServer.startup(KafkaServer.scala:58)
> >         at
> > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
> >         at kafka.Kafka$.main(Kafka.scala:50)
> >         at kafka.Kafka.main(Kafka.scala)
> > 61   [main] INFO  kafka.server.KafkaServer  - Shutting down Kafka server
> > 62   [main] INFO  kafka.utils.KafkaScheduler  - shutdown scheduler
> > kafka-logcleaner-
> > 62   [main] INFO  kafka.server.KafkaServer  - Kafka server shut down
> > completed
> >
> > [blake@kafka03 kafka]$ ls
> > dat                      datapoints-increment-40  datapoints-write-26
> > datapoints-increment-0   datapoints-increment-41  datapoints-write-27
> > datapoints-increment-1   datapoints-increment-42  datapoints-write-28
> > datapoints-increment-10  datapoints-increment-43  datapoints-write-29
> > datapoints-increment-11  datapoints-increment-44  datapoints-write-3
> > datapoints-increment-12  datapoints-increment-45  datapoints-write-30
> > datapoints-increment-13  datapoints-increment-46  datapoints-write-31
> > datapoints-increment-14  datapoints-increment-47  datapoints-write-32
> > datapoints-increment-15  datapoints-increment-48  datapoints-write-33
> > datapoints-increment-16  datapoints-increment-49  datapoints-write-34
> > datapoints-increment-17  datapoints-increment-5   datapoints-write-35
> > datapoints-increment-18  datapoints-increment-6   datapoints-write-36
> > datapoints-increment-19  datapoints-increment-7   datapoints-write-37
> > datapoints-increment-2   datapoints-increment-8   datapoints-write-38
> > datapoints-increment-20  datapoints-increment-9   datapoints-write-39
> > datapoints-increment-21  datapoints-wr            datapoints-write-4
> > datapoints-increment-22  datapoints-writ          datapoints-write-40
> > datapoints-increment-23  datapoints-write-0       datapoints-write-41
> > datapoints-increment-24  datapoints-write-1       datapoints-write-42
> > datapoints-increment-25  datapoints-write-10      datapoints-write-43
> > datapoints-increment-26  datapoints-write-11      datapoints-write-44
> > datapoints-increment-27  datapoints-write-12      datapoints-write-45
> > datapoints-increment-28  datapoints-write-13      datapoints-write-46
> > datapoints-increment-29  datapoints-write-14      datapoints-write-47
> > datapoints-increment-3   datapoints-write-15      datapoints-write-48
> > datapoints-increment-30  datapoints-write-16      datapoints-write-49
> > datapoints-increment-31  datapoints-write-17      datapoints-write-5
> > datapoints-increment-32  datapoints-write-18      datapoints-write-6
> > datapoints-increment-33  datapoints-write-19      datapoints-write-7
> > datapoints-increment-34  datapoints-write-2       datapoints-write-8
> > datapoints-increment-35  datapoints-write-20      datapoints-write-9
> > datapoints-increment-36  datapoints-write-21      series-index-0
> > datapoints-increment-37  datapoints-write-22      series-index-1
> > datapoints-increment-38  datapoints-write-23      series-index-2
> > datapoints-increment-39  datapoints-write-24      series-index-3
> > datapoints-increment-4   datapoints-write-25      series-index-4
> > [blake@kafka03 kafka]$ cd datapoints-wr
> > [blake@kafka03 datapoints-wr]$ ls -lah
> > total 8.0K
> > drwxr-xr-x   2 kafka kafka 4.0K Jul 11 18:51 .
> > drwxr-xr-x 110 kafka kafka 4.0K Jul 11 19:05 ..
> > -rw-r--r--   1 kafka kafka    0 Jul 11 18:51 00000000000000000000.kafka
> >
> >
> >
> > [2] In our consumer logs:
> >
> > 013-07-11 21:33:01,136 61867 [FetchRunnable-0] ERROR
> > kafka.consumer.FetcherRunnable  - error in FetcherRunnable
> > kafka.message.InvalidMessageException: message is invalid, compression
> > codec: NoCompressionCodec size: 79 curr offset: 93706191248 init
> > offset: 93706191165
> >         at
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> >         at
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160)
> >         at
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> >         at
> > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> >         at
> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> >         at
> >
> kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64)
> >         at
> >
> kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59)
> >         at
> > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57)
> >         at
> >
> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79)
> >         at
> >
> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:65)
> >         at
> >
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
> >         at scala.collection.immutable.List.foreach(List.scala:76)
> >         at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65)
> >
>

Reply via email to