[ "Aha!", you say, "now I know why this guy's been doing so much tshark stuff!" 
(-: ] 

   Hi.  I'm running into a strange situation, in which more or less all of the 
topics on our Kafka server behave exactly as expected... but the data produced 
by one family of applications is producing fairly frequent topic corruption.

   When this happens, on the client side, the results are all over the place: 
sometimes you get a ConsumerFetchSizeTooSmall exception, or an exception for an 
unknown error type, or an invalid-offset error, it's all over the map.

   On the server side, I think something like this is the first sign of badness:

[2014-08-11 21:03:28,121] ERROR [KafkaApi-1] Error processing ProducerRequest 
with correlation id 6750 from client test-producer on partition [mytopic,9] 
(kafka.server.KafkaApis)
java.lang.ArrayIndexOutOfBoundsException
[2014-08-11 21:03:28,121] INFO [KafkaApi-1] Send the close connection response 
due to error handling produce request [clientId = test-producer, correlationId 
= 6750, topicAndPartition = [mytopic,9]] with Ack=0 (kafka.server.KafkaApis)

shortly thereafter, you begin to see oddness facing the clients:

[2014-08-11 21:17:58,132] ERROR [KafkaApi-1] Error when processing fetch 
request for partition [mytopic,9] offset 1327 from consumer with correlation id 
87204 (kafka.server.KafkaApis)
java.lang.IllegalStateException: Invalid message size: 0
        at kafka.log.FileMessageSet.searchFor(FileMessageSet.scala:127)
        at kafka.log.LogSegment.translateOffset(LogSegment.scala:100)
        at kafka.log.LogSegment.read(LogSegment.scala:137)
        at kafka.log.Log.read(Log.scala:386)
        at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530)
        at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476)
        at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
        at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:471)
        at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:437)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:186)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
        at java.lang.Thread.run(Unknown Source)

If I go run the DumpLogSegments tool on the particular topic and partition 
that's generating the errors, I can see there's corruption in the log:

Non-secutive offsets in :/data/d3/kafka/log/mytopic-9/00000000000000000000.log
  1327 is followed by 1327

The only thing producing data to corrupted topics was also the only thing where 
snappy compression was turned on in the Java API being used by the producer 
(it's a Storm topology; we've had the same issue with one in Scala and with one 
that produces very similar data, but that was written in Java).  We turned that 
off, published to a different topic name (so it was created fresh), and had a 
couple of happy days where all was well.  Then we decided that all was well so 
we tried to go back to the original topic -- after we'd verified that all data 
had aged out of the logs for that topic.  And we started seeing errors again.  
So we switched to a different topic again, let it be created, and also started 
seeing errors on that topic.

We have other producers, written in C and Java and python, which are working 
flawlessly, even though the size of the data they produce and the rate at which 
they produce it is much larger than what we're seeing with this one problematic 
producer.  We also have producers written in other languages that produce at 
very low rates, so it's (probably) not the sort of thing where the issue is 
masked by more frequent data production.

But in any case it looks like there's something the client can send that will 
corrupt the topic, which seems like something that shouldn't be able to happen. 
 I know there's at least some error checking for bad protocol requests, as I 
hacked a python client to produce some corrupt messages and saw an error 
response from the server.

I'm happy to supply more data but I'm not sure what would be useful.  I'm also 
fine with continuing to dig into this on my own but I'd reached a point where 
it'd be useful to know if anyone had seen something like this before.  I have a 
ton o' tcpdumps running and some tail -F greps running on the logs so that if 
we see that producer error again we can go find the corresponding tcpdump file 
and hopefully find the smoking gun.  (It turns out that the real-time tshark 
processing invocations I sent out earlier can get quite far behind; I had that 
running when the corruption occurred today, but the output processing was a 
full hour behind the current time, the packet-writing part of tshark was far 
ahead of the packet-analyzing part!)

Are there any particular log4j options I should turn on?  Is there a way to 
just enable trace logging for a specific topic?  Does trace logging print the 
contents of the message somewhere, not as something all nice and interpreted 
but as, say, a bag of hex digits?  I might end up rebuilding kafka and adding 
some very specialized logging just for this.

Kafka 0.8.1.1, JRE 1.6.0-71, Storm 0.9.1, RHEL6, in likely order of importance. 
(-:  Also, here's the topic description:

Topic:mytopic   PartitionCount:10       ReplicationFactor:1     Configs:
        Topic: mytopic  Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: mytopic  Partition: 1    Leader: 1       Replicas: 1     Isr: 1
        Topic: mytopic  Partition: 2    Leader: 0       Replicas: 0     Isr: 0
        Topic: mytopic  Partition: 3    Leader: 1       Replicas: 1     Isr: 1
        Topic: mytopic  Partition: 4    Leader: 0       Replicas: 0     Isr: 0
        Topic: mytopic  Partition: 5    Leader: 1       Replicas: 1     Isr: 1
        Topic: mytopic  Partition: 6    Leader: 0       Replicas: 0     Isr: 0
        Topic: mytopic  Partition: 7    Leader: 1       Replicas: 1     Isr: 1
        Topic: mytopic  Partition: 8    Leader: 0       Replicas: 0     Isr: 0
        Topic: mytopic  Partition: 9    Leader: 1       Replicas: 1     Isr: 1

(2 brokers, 1 ZK server, no obvious issues with delays or process restarts or 
disk errors or any of the other usual suspects.  One partition per filesystem.  
But my gut says none of that's pertinent, it's a matter of which partition the 
producer happens to be publishing to when it sends garbage.)


        -Steve

Reply via email to