Hi everyone,

Last week, one of our production Kafka 0.7.1 servers had a hardware failure
that resulted in an unclean restart. When the server came back up 5 minutes
later, there were two topic corruption problems that we had to handle to
get the pipeline working again.

1. The kafka log directory had malformed partition directory names [1]. The
partition directories were shortened names of our topic naming scheme.
Inside some of these directories was a 0 byte .kafka file. This prevented
the kafka server process from starting. To resolve this, we manually
removed the malformed topic directories and the empty log files.

Is this a known issue? If so, has it been addressed in 0.8? This seems like
a bug.

2. One of the partitions had a corrupt log message (crc32 check failure)
that prevented consumers from advancing on that partition [2]. To resolve
this issue we had to hexdump the log file seeking to the byte offset
provided by the consumer exception and remove the corrupt message at the
frame boundary. After removing the corrupt message, we created a task to
parse the log file and put it back into our producer / consumer pipeline.
The InvalidMessageException never bubbled to our consumer thread,
preventing us from handling this error without manual manipulation of the
log files (our consumer code was catching InvalidMessageException, but it
never reached that point).

Is a corrupt log file a state that Kafka is supposed to handle, or is our
consumer code supposed to handle it? I'm confused about how we can avoid
manually cleaning up log files with a hex editor if we get into this state
again.

Thanks!

Blake

[1] Corrupt partition directories in the broker data directories:

0    [main] INFO  kafka.utils.Utils$  - The number of partitions for
topic  datapoints-write : 50
1    [main] INFO  kafka.utils.Utils$  - The number of partitions for
topic  datapoints-increment : 50
2    [main] INFO  kafka.utils.Utils$  - The number of partitions for
topic  series-index : 5
5    [main] INFO  kafka.server.KafkaServer  - Starting Kafka server...
17   [main] INFO  kafka.log.LogManager  - Loading log 'datapoints-write-18'
32   [main] INFO  kafka.log.Log  - Loading the last segment
/var/kafka/datapoints-write-18/00000000095029068903.kafka in mutable
mode, recovery false
46   [main] INFO  kafka.log.LogManager  - Loading log 'datapoints-increment-15'
46   [main] INFO  kafka.log.Log  - Loading the last segment
/var/kafka/datapoints-increment-15/00000000000000000000.kafka in
mutable mode, recovery false
47   [main] INFO  kafka.log.LogManager  - Loading log 'datapoints-increment-25'
47   [main] INFO  kafka.log.Log  - Loading the last segment
/var/kafka/datapoints-increment-25/00000000000000000000.kafka in
mutable mode, recovery false
47   [main] INFO  kafka.log.LogManager  - Loading log 'datapoints-write-27'
48   [main] INFO  kafka.log.Log  - Loading the last segment
/var/kafka/datapoints-write-27/00000000093417873484.kafka in mutable
mode, recovery false
48   [main] INFO  kafka.log.LogManager  - Loading log 'datapoints-write-9'
48   [main] INFO  kafka.log.Log  - Loading the last segment
/var/kafka/datapoints-write-9/00000000092880448733.kafka in mutable
mode, recovery false
49   [main] INFO  kafka.log.LogManager  - Loading log 'datapoints-increment-8'
49   [main] INFO  kafka.log.Log  - Loading the last segment
/var/kafka/datapoints-increment-8/00000000000000000000.kafka in
mutable mode, recovery false
49   [main] INFO  kafka.log.LogManager  - Loading log 'datapoints-increment-29'
49   [main] INFO  kafka.log.Log  - Loading the last segment
/var/kafka/datapoints-increment-29/00000000008053064625.kafka in
mutable mode, recovery false
50   [main] INFO  kafka.log.LogManager  - Loading log 'datapoints-increment-34'
50   [main] INFO  kafka.log.Log  - Loading the last segment
/var/kafka/datapoints-increment-34/00000000000000000000.kafka in
mutable mode, recovery false
50   [main] INFO  kafka.log.LogManager  - Loading log 'datapoints-increment-19'
51   [main] INFO  kafka.log.Log  - Loading the last segment
/var/kafka/datapoints-increment-19/00000000000000000000.kafka in
mutable mode, recovery false
51   [main] INFO  kafka.log.LogManager  - Loading log 'datapoints-increment-16'
51   [main] INFO  kafka.log.Log  - Loading the last segment
/var/kafka/datapoints-increment-16/00000000000000000000.kafka in
mutable mode, recovery false
51   [main] INFO  kafka.log.LogManager  - Loading log 'datapoints-increment-27'
52   [main] INFO  kafka.log.Log  - Loading the last segment
/var/kafka/datapoints-increment-27/00000000000000000000.kafka in
mutable mode, recovery false
52   [main] INFO  kafka.log.LogManager  - Loading log 'datapoints-write-34'
52   [main] INFO  kafka.log.Log  - Loading the last segment
/var/kafka/datapoints-write-34/00000000092880904403.kafka in mutable
mode, recovery false
53   [main] INFO  kafka.log.LogManager  - Loading log 'datapoints-write-21'
53   [main] INFO  kafka.log.Log  - Loading the last segment
/var/kafka/datapoints-write-21/00000000095028969469.kafka in mutable
mode, recovery false
53   [main] INFO  kafka.log.LogManager  - Loading log 'datapoints-write-48'
54   [main] INFO  kafka.log.Log  - Loading the last segment
/var/kafka/datapoints-write-48/00000000094491048876.kafka in mutable
mode, recovery false
54   [main] INFO  kafka.log.LogManager  - Loading log 'datapoints-increment-40'
55   [main] INFO  kafka.log.Log  - Loading the last segment
/var/kafka/datapoints-increment-40/00000000000000000000.kafka in
mutable mode, recovery false
55   [main] INFO  kafka.log.LogManager  - Loading log 'datapoints-increment-6'
55   [main] INFO  kafka.log.Log  - Loading the last segment
/var/kafka/datapoints-increment-6/00000000000000000000.kafka in
mutable mode, recovery false
56   [main] INFO  kafka.log.LogManager  - Loading log 'datapoints-write-25'
56   [main] INFO  kafka.log.Log  - Loading the last segment
/var/kafka/datapoints-write-25/00000000093417394574.kafka in mutable
mode, recovery false
56   [main] INFO  kafka.log.LogManager  - Loading log 'datapoints-write-33'
57   [main] INFO  kafka.log.Log  - Loading the last segment
/var/kafka/datapoints-write-33/00000000093417473922.kafka in mutable
mode, recovery false
57   [main] INFO  kafka.log.LogManager  - Loading log 'datapoints-write-35'
58   [main] INFO  kafka.log.Log  - Loading the last segment
/var/kafka/datapoints-write-35/00000000092880891040.kafka in mutable
mode, recovery false
58   [main] INFO  kafka.log.LogManager  - Loading log 'datapoints-wr'
58   [main] INFO  kafka.log.Log  - Loading the last segment
/var/kafka/datapoints-wr/00000000000000000000.kafka in mutable mode,
recovery false
60   [main] FATAL kafka.server.KafkaServerStartable  - Fatal error
during KafkaServerStable startup. Prepare to shutdown
java.lang.NumberFormatException: For input string: "wr"
        at 
java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
        at java.lang.Integer.parseInt(Integer.java:481)
        at java.lang.Integer.parseInt(Integer.java:514)
        at 
scala.collection.immutable.StringLike$class.toInt(StringLike.scala:207)
        at scala.collection.immutable.StringOps.toInt(StringOps.scala:31)
        at kafka.utils.Utils$.getTopicPartition(Utils.scala:558)
        at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:71)
        at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:65)
        at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
        at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
        at kafka.log.LogManager.<init>(LogManager.scala:65)
        at kafka.server.KafkaServer.startup(KafkaServer.scala:58)
        at 
kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
        at kafka.Kafka$.main(Kafka.scala:50)
        at kafka.Kafka.main(Kafka.scala)
61   [main] INFO  kafka.server.KafkaServer  - Shutting down Kafka server
62   [main] INFO  kafka.utils.KafkaScheduler  - shutdown scheduler
kafka-logcleaner-
62   [main] INFO  kafka.server.KafkaServer  - Kafka server shut down completed

[blake@kafka03 kafka]$ ls
dat                      datapoints-increment-40  datapoints-write-26
datapoints-increment-0   datapoints-increment-41  datapoints-write-27
datapoints-increment-1   datapoints-increment-42  datapoints-write-28
datapoints-increment-10  datapoints-increment-43  datapoints-write-29
datapoints-increment-11  datapoints-increment-44  datapoints-write-3
datapoints-increment-12  datapoints-increment-45  datapoints-write-30
datapoints-increment-13  datapoints-increment-46  datapoints-write-31
datapoints-increment-14  datapoints-increment-47  datapoints-write-32
datapoints-increment-15  datapoints-increment-48  datapoints-write-33
datapoints-increment-16  datapoints-increment-49  datapoints-write-34
datapoints-increment-17  datapoints-increment-5   datapoints-write-35
datapoints-increment-18  datapoints-increment-6   datapoints-write-36
datapoints-increment-19  datapoints-increment-7   datapoints-write-37
datapoints-increment-2   datapoints-increment-8   datapoints-write-38
datapoints-increment-20  datapoints-increment-9   datapoints-write-39
datapoints-increment-21  datapoints-wr            datapoints-write-4
datapoints-increment-22  datapoints-writ          datapoints-write-40
datapoints-increment-23  datapoints-write-0       datapoints-write-41
datapoints-increment-24  datapoints-write-1       datapoints-write-42
datapoints-increment-25  datapoints-write-10      datapoints-write-43
datapoints-increment-26  datapoints-write-11      datapoints-write-44
datapoints-increment-27  datapoints-write-12      datapoints-write-45
datapoints-increment-28  datapoints-write-13      datapoints-write-46
datapoints-increment-29  datapoints-write-14      datapoints-write-47
datapoints-increment-3   datapoints-write-15      datapoints-write-48
datapoints-increment-30  datapoints-write-16      datapoints-write-49
datapoints-increment-31  datapoints-write-17      datapoints-write-5
datapoints-increment-32  datapoints-write-18      datapoints-write-6
datapoints-increment-33  datapoints-write-19      datapoints-write-7
datapoints-increment-34  datapoints-write-2       datapoints-write-8
datapoints-increment-35  datapoints-write-20      datapoints-write-9
datapoints-increment-36  datapoints-write-21      series-index-0
datapoints-increment-37  datapoints-write-22      series-index-1
datapoints-increment-38  datapoints-write-23      series-index-2
datapoints-increment-39  datapoints-write-24      series-index-3
datapoints-increment-4   datapoints-write-25      series-index-4
[blake@kafka03 kafka]$ cd datapoints-wr
[blake@kafka03 datapoints-wr]$ ls -lah
total 8.0K
drwxr-xr-x   2 kafka kafka 4.0K Jul 11 18:51 .
drwxr-xr-x 110 kafka kafka 4.0K Jul 11 19:05 ..
-rw-r--r--   1 kafka kafka    0 Jul 11 18:51 00000000000000000000.kafka



[2] In our consumer logs:

013-07-11 21:33:01,136 61867 [FetchRunnable-0] ERROR
kafka.consumer.FetcherRunnable  - error in FetcherRunnable
kafka.message.InvalidMessageException: message is invalid, compression
codec: NoCompressionCodec size: 79 curr offset: 93706191248 init
offset: 93706191165
        at 
kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
        at 
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160)
        at 
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
        at 
kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
        at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
        at 
kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64)
        at 
kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59)
        at 
kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57)
        at 
kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79)
        at 
kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:65)
        at 
scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
        at scala.collection.immutable.List.foreach(List.scala:76)
        at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65)

Reply via email to