Interesting, could you run DumpLogSegments with and w/o deep-iteration and
send the output around offset 1327?

Thanks,

Jun


On Tue, Aug 12, 2014 at 5:42 PM, Steve Miller <st...@idrathernotsay.com>
wrote:

> [ "Aha!", you say, "now I know why this guy's been doing so much tshark
> stuff!" (-: ]
>
>    Hi.  I'm running into a strange situation, in which more or less all of
> the topics on our Kafka server behave exactly as expected... but the data
> produced by one family of applications is producing fairly frequent topic
> corruption.
>
>    When this happens, on the client side, the results are all over the
> place: sometimes you get a ConsumerFetchSizeTooSmall exception, or an
> exception for an unknown error type, or an invalid-offset error, it's all
> over the map.
>
>    On the server side, I think something like this is the first sign of
> badness:
>
> [2014-08-11 21:03:28,121] ERROR [KafkaApi-1] Error processing
> ProducerRequest with correlation id 6750 from client test-producer on
> partition [mytopic,9] (kafka.server.KafkaApis)
> java.lang.ArrayIndexOutOfBoundsException
> [2014-08-11 21:03:28,121] INFO [KafkaApi-1] Send the close connection
> response due to error handling produce request [clientId = test-producer,
> correlationId = 6750, topicAndPartition = [mytopic,9]] with Ack=0
> (kafka.server.KafkaApis)
>
> shortly thereafter, you begin to see oddness facing the clients:
>
> [2014-08-11 21:17:58,132] ERROR [KafkaApi-1] Error when processing fetch
> request for partition [mytopic,9] offset 1327 from consumer with
> correlation id 87204 (kafka.server.KafkaApis)
> java.lang.IllegalStateException: Invalid message size: 0
>         at kafka.log.FileMessageSet.searchFor(FileMessageSet.scala:127)
>         at kafka.log.LogSegment.translateOffset(LogSegment.scala:100)
>         at kafka.log.LogSegment.read(LogSegment.scala:137)
>         at kafka.log.Log.read(Log.scala:386)
>         at
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530)
>         at
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476)
>         at
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471)
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>         at
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
>         at
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
>         at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>         at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>         at
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:471)
>         at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:437)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:186)
>         at
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
>         at java.lang.Thread.run(Unknown Source)
>
> If I go run the DumpLogSegments tool on the particular topic and partition
> that's generating the errors, I can see there's corruption in the log:
>
> Non-secutive offsets in
> :/data/d3/kafka/log/mytopic-9/00000000000000000000.log
>   1327 is followed by 1327
>
> The only thing producing data to corrupted topics was also the only thing
> where snappy compression was turned on in the Java API being used by the
> producer (it's a Storm topology; we've had the same issue with one in Scala
> and with one that produces very similar data, but that was written in
> Java).  We turned that off, published to a different topic name (so it was
> created fresh), and had a couple of happy days where all was well.  Then we
> decided that all was well so we tried to go back to the original topic --
> after we'd verified that all data had aged out of the logs for that topic.
>  And we started seeing errors again.  So we switched to a different topic
> again, let it be created, and also started seeing errors on that topic.
>
> We have other producers, written in C and Java and python, which are
> working flawlessly, even though the size of the data they produce and the
> rate at which they produce it is much larger than what we're seeing with
> this one problematic producer.  We also have producers written in other
> languages that produce at very low rates, so it's (probably) not the sort
> of thing where the issue is masked by more frequent data production.
>
> But in any case it looks like there's something the client can send that
> will corrupt the topic, which seems like something that shouldn't be able
> to happen.  I know there's at least some error checking for bad protocol
> requests, as I hacked a python client to produce some corrupt messages and
> saw an error response from the server.
>
> I'm happy to supply more data but I'm not sure what would be useful.  I'm
> also fine with continuing to dig into this on my own but I'd reached a
> point where it'd be useful to know if anyone had seen something like this
> before.  I have a ton o' tcpdumps running and some tail -F greps running on
> the logs so that if we see that producer error again we can go find the
> corresponding tcpdump file and hopefully find the smoking gun.  (It turns
> out that the real-time tshark processing invocations I sent out earlier can
> get quite far behind; I had that running when the corruption occurred
> today, but the output processing was a full hour behind the current time,
> the packet-writing part of tshark was far ahead of the packet-analyzing
> part!)
>
> Are there any particular log4j options I should turn on?  Is there a way
> to just enable trace logging for a specific topic?  Does trace logging
> print the contents of the message somewhere, not as something all nice and
> interpreted but as, say, a bag of hex digits?  I might end up rebuilding
> kafka and adding some very specialized logging just for this.
>
> Kafka 0.8.1.1, JRE 1.6.0-71, Storm 0.9.1, RHEL6, in likely order of
> importance. (-:  Also, here's the topic description:
>
> Topic:mytopic   PartitionCount:10       ReplicationFactor:1     Configs:
>         Topic: mytopic  Partition: 0    Leader: 0       Replicas: 0
> Isr: 0
>         Topic: mytopic  Partition: 1    Leader: 1       Replicas: 1
> Isr: 1
>         Topic: mytopic  Partition: 2    Leader: 0       Replicas: 0
> Isr: 0
>         Topic: mytopic  Partition: 3    Leader: 1       Replicas: 1
> Isr: 1
>         Topic: mytopic  Partition: 4    Leader: 0       Replicas: 0
> Isr: 0
>         Topic: mytopic  Partition: 5    Leader: 1       Replicas: 1
> Isr: 1
>         Topic: mytopic  Partition: 6    Leader: 0       Replicas: 0
> Isr: 0
>         Topic: mytopic  Partition: 7    Leader: 1       Replicas: 1
> Isr: 1
>         Topic: mytopic  Partition: 8    Leader: 0       Replicas: 0
> Isr: 0
>         Topic: mytopic  Partition: 9    Leader: 1       Replicas: 1
> Isr: 1
>
> (2 brokers, 1 ZK server, no obvious issues with delays or process restarts
> or disk errors or any of the other usual suspects.  One partition per
> filesystem.  But my gut says none of that's pertinent, it's a matter of
> which partition the producer happens to be publishing to when it sends
> garbage.)
>
>
>         -Steve
>

Reply via email to