Interesting, could you run DumpLogSegments with and w/o deep-iteration and send the output around offset 1327?
Thanks, Jun On Tue, Aug 12, 2014 at 5:42 PM, Steve Miller <st...@idrathernotsay.com> wrote: > [ "Aha!", you say, "now I know why this guy's been doing so much tshark > stuff!" (-: ] > > Hi. I'm running into a strange situation, in which more or less all of > the topics on our Kafka server behave exactly as expected... but the data > produced by one family of applications is producing fairly frequent topic > corruption. > > When this happens, on the client side, the results are all over the > place: sometimes you get a ConsumerFetchSizeTooSmall exception, or an > exception for an unknown error type, or an invalid-offset error, it's all > over the map. > > On the server side, I think something like this is the first sign of > badness: > > [2014-08-11 21:03:28,121] ERROR [KafkaApi-1] Error processing > ProducerRequest with correlation id 6750 from client test-producer on > partition [mytopic,9] (kafka.server.KafkaApis) > java.lang.ArrayIndexOutOfBoundsException > [2014-08-11 21:03:28,121] INFO [KafkaApi-1] Send the close connection > response due to error handling produce request [clientId = test-producer, > correlationId = 6750, topicAndPartition = [mytopic,9]] with Ack=0 > (kafka.server.KafkaApis) > > shortly thereafter, you begin to see oddness facing the clients: > > [2014-08-11 21:17:58,132] ERROR [KafkaApi-1] Error when processing fetch > request for partition [mytopic,9] offset 1327 from consumer with > correlation id 87204 (kafka.server.KafkaApis) > java.lang.IllegalStateException: Invalid message size: 0 > at kafka.log.FileMessageSet.searchFor(FileMessageSet.scala:127) > at kafka.log.LogSegment.translateOffset(LogSegment.scala:100) > at kafka.log.LogSegment.read(LogSegment.scala:137) > at kafka.log.Log.read(Log.scala:386) > at > kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224) > at > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at > kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:471) > at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:437) > at kafka.server.KafkaApis.handle(KafkaApis.scala:186) > at > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) > at java.lang.Thread.run(Unknown Source) > > If I go run the DumpLogSegments tool on the particular topic and partition > that's generating the errors, I can see there's corruption in the log: > > Non-secutive offsets in > :/data/d3/kafka/log/mytopic-9/00000000000000000000.log > 1327 is followed by 1327 > > The only thing producing data to corrupted topics was also the only thing > where snappy compression was turned on in the Java API being used by the > producer (it's a Storm topology; we've had the same issue with one in Scala > and with one that produces very similar data, but that was written in > Java). We turned that off, published to a different topic name (so it was > created fresh), and had a couple of happy days where all was well. Then we > decided that all was well so we tried to go back to the original topic -- > after we'd verified that all data had aged out of the logs for that topic. > And we started seeing errors again. So we switched to a different topic > again, let it be created, and also started seeing errors on that topic. > > We have other producers, written in C and Java and python, which are > working flawlessly, even though the size of the data they produce and the > rate at which they produce it is much larger than what we're seeing with > this one problematic producer. We also have producers written in other > languages that produce at very low rates, so it's (probably) not the sort > of thing where the issue is masked by more frequent data production. > > But in any case it looks like there's something the client can send that > will corrupt the topic, which seems like something that shouldn't be able > to happen. I know there's at least some error checking for bad protocol > requests, as I hacked a python client to produce some corrupt messages and > saw an error response from the server. > > I'm happy to supply more data but I'm not sure what would be useful. I'm > also fine with continuing to dig into this on my own but I'd reached a > point where it'd be useful to know if anyone had seen something like this > before. I have a ton o' tcpdumps running and some tail -F greps running on > the logs so that if we see that producer error again we can go find the > corresponding tcpdump file and hopefully find the smoking gun. (It turns > out that the real-time tshark processing invocations I sent out earlier can > get quite far behind; I had that running when the corruption occurred > today, but the output processing was a full hour behind the current time, > the packet-writing part of tshark was far ahead of the packet-analyzing > part!) > > Are there any particular log4j options I should turn on? Is there a way > to just enable trace logging for a specific topic? Does trace logging > print the contents of the message somewhere, not as something all nice and > interpreted but as, say, a bag of hex digits? I might end up rebuilding > kafka and adding some very specialized logging just for this. > > Kafka 0.8.1.1, JRE 1.6.0-71, Storm 0.9.1, RHEL6, in likely order of > importance. (-: Also, here's the topic description: > > Topic:mytopic PartitionCount:10 ReplicationFactor:1 Configs: > Topic: mytopic Partition: 0 Leader: 0 Replicas: 0 > Isr: 0 > Topic: mytopic Partition: 1 Leader: 1 Replicas: 1 > Isr: 1 > Topic: mytopic Partition: 2 Leader: 0 Replicas: 0 > Isr: 0 > Topic: mytopic Partition: 3 Leader: 1 Replicas: 1 > Isr: 1 > Topic: mytopic Partition: 4 Leader: 0 Replicas: 0 > Isr: 0 > Topic: mytopic Partition: 5 Leader: 1 Replicas: 1 > Isr: 1 > Topic: mytopic Partition: 6 Leader: 0 Replicas: 0 > Isr: 0 > Topic: mytopic Partition: 7 Leader: 1 Replicas: 1 > Isr: 1 > Topic: mytopic Partition: 8 Leader: 0 Replicas: 0 > Isr: 0 > Topic: mytopic Partition: 9 Leader: 1 Replicas: 1 > Isr: 1 > > (2 brokers, 1 ZK server, no obvious issues with delays or process restarts > or disk errors or any of the other usual suspects. One partition per > filesystem. But my gut says none of that's pertinent, it's a matter of > which partition the producer happens to be publishing to when it sends > garbage.) > > > -Steve >