Sure. I ran: /opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.log --deep-iteration
and got (in addition to the same non-secutive offsets error): [ ... ] offset: 1320 position: 344293 isvalid: true payloadsize: 208 magic: 0 compresscodec: NoCompressionCodec crc: 1038804751 offset: 1321 position: 344527 isvalid: true payloadsize: 194 magic: 0 compresscodec: NoCompressionCodec crc: 1211626571 offset: 1322 position: 344747 isvalid: true payloadsize: 195 magic: 0 compresscodec: NoCompressionCodec crc: 228214666 offset: 1323 position: 344968 isvalid: true payloadsize: 285 magic: 0 compresscodec: NoCompressionCodec crc: 2412118642 offset: 1324 position: 345279 isvalid: true payloadsize: 267 magic: 0 compresscodec: NoCompressionCodec crc: 814469229 offset: 1325 position: 345572 isvalid: true payloadsize: 267 magic: 0 compresscodec: NoCompressionCodec crc: 874964779 offset: 1326 position: 345865 isvalid: true payloadsize: 143 magic: 0 compresscodec: NoCompressionCodec crc: 1448343333 offset: 1327 position: 346034 isvalid: true payloadsize: 161 magic: 0 compresscodec: NoCompressionCodec crc: 3486482767 offset: 1327 position: 346221 isvalid: true payloadsize: 194 magic: 0 compresscodec: NoCompressionCodec crc: 3322604516 offset: 1328 position: 346441 isvalid: true payloadsize: 207 magic: 0 compresscodec: NoCompressionCodec crc: 3181460980 offset: 1329 position: 346674 isvalid: true payloadsize: 164 magic: 0 compresscodec: NoCompressionCodec crc: 77979807 offset: 1330 position: 346864 isvalid: true payloadsize: 208 magic: 0 compresscodec: NoCompressionCodec crc: 3051442612 offset: 1331 position: 347098 isvalid: true payloadsize: 196 magic: 0 compresscodec: NoCompressionCodec crc: 1906163219 offset: 1332 position: 347320 isvalid: true payloadsize: 196 magic: 0 compresscodec: NoCompressionCodec crc: 3849763639 offset: 1333 position: 347542 isvalid: true payloadsize: 207 magic: 0 compresscodec: NoCompressionCodec crc: 3724257965 offset: 1334 position: 347775 isvalid: true payloadsize: 194 magic: 0 compresscodec: NoCompressionCodec crc: 510173020 offset: 1335 position: 347995 isvalid: true payloadsize: 357 magic: 0 compresscodec: NoCompressionCodec crc: 2043065154 offset: 1336 position: 348378 isvalid: true payloadsize: 195 magic: 0 compresscodec: NoCompressionCodec crc: 435251578 offset: 1337 position: 348599 isvalid: true payloadsize: 169 magic: 0 compresscodec: NoCompressionCodec crc: 1172187172 offset: 1338 position: 348794 isvalid: true payloadsize: 312 magic: 0 compresscodec: NoCompressionCodec crc: 1324582122 offset: 1339 position: 349132 isvalid: true payloadsize: 196 magic: 0 compresscodec: NoCompressionCodec crc: 3649742340 offset: 1340 position: 349354 isvalid: true payloadsize: 288 magic: 0 compresscodec: NoCompressionCodec crc: 581177172 (etc.) I also ran: /opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.index --deep-iteration At first, I got the following: Dumping 00000000000000000000.index offset: 16 position: 4342 offset: 32 position: 8555 offset: 48 position: 12676 offset: 63 position: 16824 offset: 79 position: 21256 offset: 96 position: 25599 offset: 112 position: 29740 offset: 126 position: 33981 offset: 143 position: 38122 offset: 160 position: 42364 offset: 176 position: 46589 offset: 192 position: 50755 offset: 208 position: 54969 offset: 223 position: 59207 offset: 239 position: 63317 offset: 255 position: 67547 offset: 272 position: 71771 offset: 289 position: 76012 offset: 306 position: 80476 offset: 323 position: 84602 offset: 337 position: 88876 offset: 354 position: 93153 offset: 371 position: 97329 offset: 387 position: 101496 offset: 403 position: 105657 offset: 419 position: 109848 offset: 434 position: 113950 offset: 451 position: 118223 offset: 465 position: 122366 offset: 482 position: 126463 offset: 499 position: 130707 offset: 517 position: 135044 offset: 533 position: 139505 offset: 549 position: 143637 offset: 566 position: 147916 offset: 582 position: 152223 offset: 599 position: 156528 offset: 613 position: 160694 offset: 629 position: 164807 offset: 644 position: 169020 offset: 662 position: 173449 offset: 679 position: 177721 offset: 695 position: 182003 offset: 711 position: 186374 offset: 728 position: 190644 offset: 746 position: 195036 offset: 762 position: 199231 offset: 778 position: 203581 offset: 794 position: 208024 offset: 810 position: 212192 offset: 825 position: 216446 offset: 841 position: 220564 offset: 858 position: 224718 offset: 875 position: 228823 offset: 890 position: 232983 offset: 907 position: 237116 offset: 920 position: 241229 offset: 936 position: 245504 offset: 951 position: 249601 offset: 969 position: 253908 offset: 986 position: 258074 offset: 1002 position: 262228 offset: 1018 position: 266385 offset: 1035 position: 270699 offset: 1051 position: 274843 offset: 1067 position: 278954 offset: 1085 position: 283283 offset: 1102 position: 287632 offset: 1118 position: 291971 offset: 1135 position: 296271 offset: 1152 position: 300722 offset: 1168 position: 304924 offset: 1184 position: 309051 offset: 1201 position: 313349 offset: 1217 position: 317543 offset: 1233 position: 321727 offset: 1249 position: 325877 offset: 1266 position: 330122 offset: 1282 position: 334413 offset: 1298 position: 338579 offset: 1313 position: 342717 Exception in thread "main" java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.<init>(Unknown Source) at java.nio.ByteBuffer.allocate(Unknown Source) at kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:188) at kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:165) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:38) at scala.collection.IterableLike$class.head(IterableLike.scala:91) at kafka.message.MessageSet.head(MessageSet.scala:67) at kafka.tools.DumpLogSegments$$anonfun$kafka$tools$DumpLogSegments$$dumpIndex$1.apply$mcVI$sp(DumpLogSegments.scala:102) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at kafka.tools.DumpLogSegments$.kafka$tools$DumpLogSegments$$dumpIndex(DumpLogSegments.scala:99) at kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:68) at kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:61) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105) at kafka.tools.DumpLogSegments$.main(DumpLogSegments.scala:61) at kafka.tools.DumpLogSegments.main(DumpLogSegments.scala) If I fiddled with KAFKA_HEAP_OPTS, I could change the error to: Exception in thread "main" java.util.NoSuchElementException at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39) at scala.collection.IterableLike$class.head(IterableLike.scala:91) at kafka.message.MessageSet.head(MessageSet.scala:67) at kafka.tools.DumpLogSegments$$anonfun$kafka$tools$DumpLogSegments$$dumpIndex$1.apply$mcVI$sp(DumpLogSegments.scala:102) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at kafka.tools.DumpLogSegments$.kafka$tools$DumpLogSegments$$dumpIndex(DumpLogSegments.scala:99) at kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:68) at kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:61) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105) at kafka.tools.DumpLogSegments$.main(DumpLogSegments.scala:61) at kafka.tools.DumpLogSegments.main(DumpLogSegments.scala) (bumping it to 1024M didn't do the trick, I still got the heap-space exception then, but going to 10240M gave it enough heap to fail this way (-: ). Also BTW: there seems to be a bug in dumpIndex() in DumpLogSegments.scala, as it is unhappy if the topic name has a dot in it. If you try to use DumpLogSegments to look at such a file (well, if it's in the "real" log directory, of course), if you topic was named X.Y, and you're in the directory X.Y-9, and you're looking at 00000000000000000000.index, you'll get an error like: Exception in thread "main" java.io.FileNotFoundException: /home/whatever/X.log (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.<init>(Unknown Source) at kafka.utils.Utils$.openChannel(Utils.scala:157) at kafka.log.FileMessageSet.<init>(FileMessageSet.scala:74) at kafka.tools.DumpLogSegments$.kafka$tools$DumpLogSegments$$dumpIndex(DumpLogSegments.scala:97) at kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:68) at kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:61) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105) at kafka.tools.DumpLogSegments$.main(DumpLogSegments.scala:61) at kafka.tools.DumpLogSegments.main(DumpLogSegments.scala) because these both assume that there's only one dot on which to split: val startOffset = file.getName().split("\\.")(0).toLong val logFileName = file.getAbsolutePath.split("\\.")(0) + Log.LogFileSuffix (it's not bad to work around, copy or move the files to a directory without dots in it, and you're OK, but I did want to point it out). Overnight, I did a ton of pcap captures, while having something watch the logging output looking for the same sort of ProducerRequest error I'd described below. It's a lot of data to sort through and I'm still poking at it, but at first glance at least I could see the ProduceRequest that produced a similar error... and either it wasn't messed up or if it's messed up, it is messed up in a way that such that the Kafka protocol decoder doesn't see the issue. For example, here's an error: [2014-08-13 00:28:11,232] ERROR [KafkaApi-0] Error processing ProducerRequest with correlation id 3484669 from client test-producer on partition [mytopic,4] (kafka.server.KafkaApis) java.lang.ArrayIndexOutOfBoundsException [2014-08-13 00:28:11,233] INFO [KafkaApi-0] Send the close connection response due to error handling produce request [clientId = test-producer, correlationId = 3484669, topicAndPartition = [mytopic,4]] with Ack=0 (kafka.server.KafkaApis) and below is the ProduceRequest before the one that produced the error, then the one that produced the error, then the next one (note that the topic name has been changed but it is 55 bytes long). One other interesting thing I see, though: for one of the times we saw this corruption, it looks like there was also some leader-election flappage going on (again, topic name changed to something innocuous). Here's the error: [2014-08-13 04:45:41,750] ERROR [KafkaApi-0] Error processing ProducerRequest with correlation id 5016862 from client test-producer on partition [mytopic,8] (kafka.server.KafkaApis) java.lang.ArrayIndexOutOfBoundsException [2014-08-13 04:45:41,750] INFO [KafkaApi-0] Send the close connection response due to error handling produce request [clientId = test-producer, correlationId = 5016862, topicAndPartition = [mytopic,8]] with Ack=0 (kafka.server.KafkaApis) and here's what I saw in state-change.log that was related to the same topic and partition: [2014-08-13 04:45:28,018] TRACE Controller 0 epoch 3 changed partition [mytopic,8] state from OnlinePartition to OfflinePartition (state.change.logger) [2014-08-13 04:45:28,021] TRACE Controller 0 epoch 3 started leader election for partition [mytopic,8] (state.change.logger) [2014-08-13 04:45:28,026] ERROR Controller 0 epoch 3 initiated state change for partition [mytopic,8] from OfflinePartition to OnlinePartition failed (state.change.logger) [2014-08-13 04:45:28,933] TRACE Controller 0 epoch 3 changed state of replica 1 for partition [mytopic,8] from OnlineReplica to OfflineReplica (state.change.logger) [2014-08-13 04:45:29,150] TRACE Controller 0 epoch 3 sending UpdateMetadata request (Leader:-1,ISR:,LeaderEpoch:89,ControllerEpoch:3) with correlationId 276 to broker 0 for partition [mytopic,8] (state.change.logger) [2014-08-13 04:45:29,155] TRACE Broker 0 cached leader info (LeaderAndIsrInfo:(Leader:-1,ISR:,LeaderEpoch:89,ControllerEpoch:3),ReplicationFactor:1),AllReplicas:1) for partition [mytopic,8] in response to UpdateMetadata request sent by cont roller 0 epoch 3 with correlation id 276 (state.change.logger) [2014-08-13 04:45:33,940] TRACE Controller 0 epoch 3 sending UpdateMetadata request (Leader:-1,ISR:,LeaderEpoch:89,ControllerEpoch:3) with correlationId 277 to broker 1 for partition [mytopic,8] (state.change.logger) [2014-08-13 04:45:33,940] TRACE Controller 0 epoch 3 sending UpdateMetadata request (Leader:0,ISR:0,LeaderEpoch:1,ControllerEpoch:3) with correlationId 277 to broker 1 for partition [mytopic,8] (state.change.logger) [2014-08-13 04:45:33,949] TRACE Controller 0 epoch 3 changed state of replica 1 for partition [mytopic,8] from OfflineReplica to OnlineReplica (state.change.logger) [2014-08-13 04:45:33,954] TRACE Controller 0 epoch 3 sending become-follower LeaderAndIsr request (Leader:-1,ISR:,LeaderEpoch:89,ControllerEpoch:3) with correlationId 278 to broker 1 for partition [mytopic,8] (state.change.logger) [2014-08-13 04:45:33,956] TRACE Controller 0 epoch 3 sending UpdateMetadata request (Leader:-1,ISR:,LeaderEpoch:89,ControllerEpoch:3) with correlationId 278 to broker 1 for partition [mytopic,8] (state.change.logger) [2014-08-13 04:45:33,959] TRACE Controller 0 epoch 3 sending UpdateMetadata request (Leader:-1,ISR:,LeaderEpoch:89,ControllerEpoch:3) with correlationId 278 to broker 0 for partition [mytopic,8] (state.change.logger) [2014-08-13 04:45:33,961] TRACE Controller 0 epoch 3 started leader election for partition [mytopic,8] (state.change.logger) [2014-08-13 04:45:33,963] TRACE Broker 0 cached leader info (LeaderAndIsrInfo:(Leader:-1,ISR:,LeaderEpoch:89,ControllerEpoch:3),ReplicationFactor:1),AllReplicas:1) for partition [mytopic,8] in response to UpdateMetadata request sent by controller 0 epoch 3 with correlation id 278 (state.change.logger) [2014-08-13 04:45:33,968] TRACE Controller 0 epoch 3 elected leader 1 for Offline partition [mytopic,8] (state.change.logger) [2014-08-13 04:45:33,969] TRACE Controller 0 epoch 3 changed partition [mytopic,8] from OfflinePartition to OnlinePartition with leader 1 (state.change.logger) [2014-08-13 04:45:39,245] TRACE Controller 0 epoch 3 sending become-leader LeaderAndIsr request (Leader:1,ISR:1,LeaderEpoch:90,ControllerEpoch:3) with correlationId 279 to broker 1 for partition [mytopic,8] (state.change.logger) [2014-08-13 04:45:39,248] TRACE Controller 0 epoch 3 sending UpdateMetadata request (Leader:1,ISR:1,LeaderEpoch:90,ControllerEpoch:3) with correlationId 279 to broker 1 for partition [mytopic,8] (state.change.logger) [2014-08-13 04:45:39,251] TRACE Controller 0 epoch 3 sending UpdateMetadata request (Leader:1,ISR:1,LeaderEpoch:90,ControllerEpoch:3) with correlationId 279 to broker 0 for partition [mytopic,8] (state.change.logger) [2014-08-13 04:45:39,255] TRACE Broker 0 cached leader info (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:90,ControllerEpoch:3),ReplicationFactor:1),AllReplicas:1) for partition [mytopic,8] in response to UpdateMetadata request sent by controller 0 epoch 3 with correlation id 279 (state.change.logger) Strangely, though, I see another ProducerRequest error at around 09:08:11, but I don't see any obvious controller-related flappage at that time. So maybe correlation (leader election at nearly the same time we see the error) doesn't imply causation. I don't know. This is the only producer that I can think of here that produces any significant number of messages (maybe a burst of 600ish, every 30sec or so) and that does so one at a time rather than by batching them. Maybe there's an element here that's related to publishing lots of message sets with only one (small) message in them? If there's anything else you want me to look at, and/or if you want me to reformat this somehow so it's more readable (I know it's a lot of stuff inline), please let me know. Thanks! -Steve =========== sample transaction data for one case of this sort of error Frame 8451: 516 bytes on wire (4128 bits), 516 bytes captured (4128 bits) Encapsulation type: Ethernet (1) Arrival Time: Aug 13, 2014 00:28:11.232408000 UTC [Time shift for this packet: 0.000000000 seconds] Epoch Time: 1407889691.232408000 seconds [Time delta from previous captured frame: 0.000479000 seconds] [Time delta from previous displayed frame: 0.000479000 seconds] [Time since reference or first frame: 191.232312000 seconds] Frame Number: 8451 Frame Length: 516 bytes (4128 bits) Capture Length: 516 bytes (4128 bits) [Frame is marked: False] [Frame is ignored: False] [Protocols in frame: eth:ethertype:ip:tcp:kafka] [Number of per-protocol-data: 1] [Kafka, key 0] Ethernet II, Src: 84:2b:2b:4e:47:b0 (84:2b:2b:4e:47:b0), Dst: 84:2b:2b:4e:7b:a1 (84:2b:2b:4e:7b:a1) Destination: 84:2b:2b:4e:7b:a1 (84:2b:2b:4e:7b:a1) Address: 84:2b:2b:4e:7b:a1 (84:2b:2b:4e:7b:a1) .... ..0. .... .... .... .... = LG bit: Globally unique address (factory default) .... ...0 .... .... .... .... = IG bit: Individual address (unicast) Source: 84:2b:2b:4e:47:b0 (84:2b:2b:4e:47:b0) Address: 84:2b:2b:4e:47:b0 (84:2b:2b:4e:47:b0) .... ..0. .... .... .... .... = LG bit: Globally unique address (factory default) .... ...0 .... .... .... .... = IG bit: Individual address (unicast) Type: IP (0x0800) Internet Protocol Version 4, Src: 10.163.193.125 (10.163.193.125), Dst: 10.163.193.121 (10.163.193.121) Version: 4 Header Length: 20 bytes Differentiated Services Field: 0x00 (DSCP 0x00: Default; ECN: 0x00: Not-ECT (Not ECN-Capable Transport)) 0000 00.. = Differentiated Services Codepoint: Default (0x00) .... ..00 = Explicit Congestion Notification: Not-ECT (Not ECN-Capable Transport) (0x00) Total Length: 502 Identification: 0x2b7a (11130) Flags: 0x02 (Don't Fragment) 0... .... = Reserved bit: Not set .1.. .... = Don't fragment: Set ..0. .... = More fragments: Not set Fragment offset: 0 Time to live: 64 Protocol: TCP (6) Header checksum: 0x754b [validation disabled] [Good: False] [Bad: False] Source: 10.163.193.125 (10.163.193.125) Destination: 10.163.193.121 (10.163.193.121) [Source GeoIP: Unknown] [Destination GeoIP: Unknown] Transmission Control Protocol, Src Port: 33857 (33857), Dst Port: 9092 (9092), Seq: 3091465, Ack: 1, Len: 450 Source Port: 33857 (33857) Destination Port: 9092 (9092) [Stream index: 14] [TCP Segment Len: 450] Sequence number: 3091465 (relative sequence number) [Next sequence number: 3091915 (relative sequence number)] Acknowledgment number: 1 (relative ack number) Header Length: 32 bytes .... 0000 0001 1000 = Flags: 0x018 (PSH, ACK) 000. .... .... = Reserved: Not set ...0 .... .... = Nonce: Not set .... 0... .... = Congestion Window Reduced (CWR): Not set .... .0.. .... = ECN-Echo: Not set .... ..0. .... = Urgent: Not set .... ...1 .... = Acknowledgment: Set .... .... 1... = Push: Set .... .... .0.. = Reset: Not set .... .... ..0. = Syn: Not set .... .... ...0 = Fin: Not set Window size value: 107 [Calculated window size: 13696] [Window size scaling factor: 128] Checksum: 0x10a1 [validation disabled] [Good Checksum: False] [Bad Checksum: False] Urgent pointer: 0 Options: (12 bytes), No-Operation (NOP), No-Operation (NOP), Timestamps No-Operation (NOP) Type: 1 0... .... = Copy on fragmentation: No .00. .... = Class: Control (0) ...0 0001 = Number: No-Operation (NOP) (1) No-Operation (NOP) Type: 1 0... .... = Copy on fragmentation: No .00. .... = Class: Control (0) ...0 0001 = Number: No-Operation (NOP) (1) Timestamps: TSval 2762073799, TSecr 2668832110 Kind: Time Stamp Option (8) Length: 10 Timestamp value: 2762073799 Timestamp echo reply: 2668832110 [SEQ/ACK analysis] [iRTT: 0.000199000 seconds] [Bytes in flight: 450] Kafka Length: 446 API Key: Produce (0) API Version: 0 Correlation ID: 3484667 String Length: 13 Client ID: test-producer Required Acks: 0 Timeout: 10000 Array Count: 1 Produce Request Topic String Length: 55 Topic Name: mytopic Array Count: 1 Produce Request Partition Partition ID: 4 Message Set Size: 344 Message Set Offset: 0 Message Size: 332 Message CRC32: 0x81b88c04 Magic Byte: 0 .... ..00 = Compression Codec: None (0) Bytes Length: -1 Key: <MISSING> Bytes Length: 318 Value: 00b6c4d5be0a000a73646e73320008746b6f320012786e2d... Frame 8452: 465 bytes on wire (3720 bits), 465 bytes captured (3720 bits) Encapsulation type: Ethernet (1) Arrival Time: Aug 13, 2014 00:28:11.232797000 UTC [Time shift for this packet: 0.000000000 seconds] Epoch Time: 1407889691.232797000 seconds [Time delta from previous captured frame: 0.000389000 seconds] [Time delta from previous displayed frame: 0.000389000 seconds] [Time since reference or first frame: 191.232701000 seconds] Frame Number: 8452 Frame Length: 465 bytes (3720 bits) Capture Length: 465 bytes (3720 bits) [Frame is marked: False] [Frame is ignored: False] [Protocols in frame: eth:ethertype:ip:tcp:kafka] [Number of per-protocol-data: 1] [Kafka, key 0] Ethernet II, Src: 84:2b:2b:4e:47:b0 (84:2b:2b:4e:47:b0), Dst: 84:2b:2b:4e:7b:a1 (84:2b:2b:4e:7b:a1) Destination: 84:2b:2b:4e:7b:a1 (84:2b:2b:4e:7b:a1) Address: 84:2b:2b:4e:7b:a1 (84:2b:2b:4e:7b:a1) .... ..0. .... .... .... .... = LG bit: Globally unique address (factory default) .... ...0 .... .... .... .... = IG bit: Individual address (unicast) Source: 84:2b:2b:4e:47:b0 (84:2b:2b:4e:47:b0) Address: 84:2b:2b:4e:47:b0 (84:2b:2b:4e:47:b0) .... ..0. .... .... .... .... = LG bit: Globally unique address (factory default) .... ...0 .... .... .... .... = IG bit: Individual address (unicast) Type: IP (0x0800) Internet Protocol Version 4, Src: 10.163.193.125 (10.163.193.125), Dst: 10.163.193.121 (10.163.193.121) Version: 4 Header Length: 20 bytes Differentiated Services Field: 0x00 (DSCP 0x00: Default; ECN: 0x00: Not-ECT (Not ECN-Capable Transport)) 0000 00.. = Differentiated Services Codepoint: Default (0x00) .... ..00 = Explicit Congestion Notification: Not-ECT (Not ECN-Capable Transport) (0x00) Total Length: 451 Identification: 0x2b7b (11131) Flags: 0x02 (Don't Fragment) 0... .... = Reserved bit: Not set .1.. .... = Don't fragment: Set ..0. .... = More fragments: Not set Fragment offset: 0 Time to live: 64 Protocol: TCP (6) Header checksum: 0x757d [validation disabled] [Good: False] [Bad: False] Source: 10.163.193.125 (10.163.193.125) Destination: 10.163.193.121 (10.163.193.121) [Source GeoIP: Unknown] [Destination GeoIP: Unknown] Transmission Control Protocol, Src Port: 33857 (33857), Dst Port: 9092 (9092), Seq: 3091915, Ack: 1, Len: 399 Source Port: 33857 (33857) Destination Port: 9092 (9092) [Stream index: 14] [TCP Segment Len: 399] Sequence number: 3091915 (relative sequence number) [Next sequence number: 3092314 (relative sequence number)] Acknowledgment number: 1 (relative ack number) Header Length: 32 bytes .... 0000 0001 1000 = Flags: 0x018 (PSH, ACK) 000. .... .... = Reserved: Not set ...0 .... .... = Nonce: Not set .... 0... .... = Congestion Window Reduced (CWR): Not set .... .0.. .... = ECN-Echo: Not set .... ..0. .... = Urgent: Not set .... ...1 .... = Acknowledgment: Set .... .... 1... = Push: Set .... .... .0.. = Reset: Not set .... .... ..0. = Syn: Not set .... .... ...0 = Fin: Not set Window size value: 107 [Calculated window size: 13696] [Window size scaling factor: 128] Checksum: 0x6f34 [validation disabled] [Good Checksum: False] [Bad Checksum: False] Urgent pointer: 0 Options: (12 bytes), No-Operation (NOP), No-Operation (NOP), Timestamps No-Operation (NOP) Type: 1 0... .... = Copy on fragmentation: No .00. .... = Class: Control (0) ...0 0001 = Number: No-Operation (NOP) (1) No-Operation (NOP) Type: 1 0... .... = Copy on fragmentation: No .00. .... = Class: Control (0) ...0 0001 = Number: No-Operation (NOP) (1) Timestamps: TSval 2762073799, TSecr 2668832111 Kind: Time Stamp Option (8) Length: 10 Timestamp value: 2762073799 Timestamp echo reply: 2668832111 [SEQ/ACK analysis] [iRTT: 0.000199000 seconds] [Bytes in flight: 399] Kafka Length: 395 API Key: Produce (0) API Version: 0 Correlation ID: 3484669 String Length: 13 Client ID: test-producer Required Acks: 0 Timeout: 10000 Array Count: 1 Produce Request Topic String Length: 55 Topic Name: mytopic Array Count: 1 Produce Request Partition Partition ID: 4 Message Set Size: 293 Message Set Offset: 0 Message Size: 281 Message CRC32: 0x80ce403c Magic Byte: 0 .... ..00 = Compression Codec: None (0) Bytes Length: -1 Key: <MISSING> Bytes Length: 267 Value: 00b6c4d5be0a001061646e7363746c640008706172320006... Frame 8453: 354 bytes on wire (2832 bits), 354 bytes captured (2832 bits) Encapsulation type: Ethernet (1) Arrival Time: Aug 13, 2014 00:28:11.233223000 UTC [Time shift for this packet: 0.000000000 seconds] Epoch Time: 1407889691.233223000 seconds [Time delta from previous captured frame: 0.000426000 seconds] [Time delta from previous displayed frame: 0.000426000 seconds] [Time since reference or first frame: 191.233127000 seconds] Frame Number: 8453 Frame Length: 354 bytes (2832 bits) Capture Length: 354 bytes (2832 bits) [Frame is marked: False] [Frame is ignored: False] [Protocols in frame: eth:ethertype:ip:tcp:kafka] [Number of per-protocol-data: 1] [Kafka, key 0] Ethernet II, Src: 84:2b:2b:4e:47:b0 (84:2b:2b:4e:47:b0), Dst: 84:2b:2b:4e:7b:a1 (84:2b:2b:4e:7b:a1) Destination: 84:2b:2b:4e:7b:a1 (84:2b:2b:4e:7b:a1) Address: 84:2b:2b:4e:7b:a1 (84:2b:2b:4e:7b:a1) .... ..0. .... .... .... .... = LG bit: Globally unique address (factory default) .... ...0 .... .... .... .... = IG bit: Individual address (unicast) Source: 84:2b:2b:4e:47:b0 (84:2b:2b:4e:47:b0) Address: 84:2b:2b:4e:47:b0 (84:2b:2b:4e:47:b0) .... ..0. .... .... .... .... = LG bit: Globally unique address (factory default) .... ...0 .... .... .... .... = IG bit: Individual address (unicast) Type: IP (0x0800) Internet Protocol Version 4, Src: 10.163.193.125 (10.163.193.125), Dst: 10.163.193.121 (10.163.193.121) Version: 4 Header Length: 20 bytes Differentiated Services Field: 0x00 (DSCP 0x00: Default; ECN: 0x00: Not-ECT (Not ECN-Capable Transport)) 0000 00.. = Differentiated Services Codepoint: Default (0x00) .... ..00 = Explicit Congestion Notification: Not-ECT (Not ECN-Capable Transport) (0x00) Total Length: 340 Identification: 0x2b7c (11132) Flags: 0x02 (Don't Fragment) 0... .... = Reserved bit: Not set .1.. .... = Don't fragment: Set ..0. .... = More fragments: Not set Fragment offset: 0 Time to live: 64 Protocol: TCP (6) Header checksum: 0x75eb [validation disabled] [Good: False] [Bad: False] Source: 10.163.193.125 (10.163.193.125) Destination: 10.163.193.121 (10.163.193.121) [Source GeoIP: Unknown] [Destination GeoIP: Unknown] Transmission Control Protocol, Src Port: 33857 (33857), Dst Port: 9092 (9092), Seq: 3092314, Ack: 1, Len: 288 Source Port: 33857 (33857) Destination Port: 9092 (9092) [Stream index: 14] [TCP Segment Len: 288] Sequence number: 3092314 (relative sequence number) [Next sequence number: 3092602 (relative sequence number)] Acknowledgment number: 1 (relative ack number) Header Length: 32 bytes .... 0000 0001 1000 = Flags: 0x018 (PSH, ACK) 000. .... .... = Reserved: Not set ...0 .... .... = Nonce: Not set .... 0... .... = Congestion Window Reduced (CWR): Not set .... .0.. .... = ECN-Echo: Not set .... ..0. .... = Urgent: Not set .... ...1 .... = Acknowledgment: Set .... .... 1... = Push: Set .... .... .0.. = Reset: Not set .... .... ..0. = Syn: Not set .... .... ...0 = Fin: Not set Window size value: 107 [Calculated window size: 13696] [Window size scaling factor: 128] Checksum: 0xea49 [validation disabled] [Good Checksum: False] [Bad Checksum: False] Urgent pointer: 0 Options: (12 bytes), No-Operation (NOP), No-Operation (NOP), Timestamps No-Operation (NOP) Type: 1 0... .... = Copy on fragmentation: No .00. .... = Class: Control (0) ...0 0001 = Number: No-Operation (NOP) (1) No-Operation (NOP) Type: 1 0... .... = Copy on fragmentation: No .00. .... = Class: Control (0) ...0 0001 = Number: No-Operation (NOP) (1) Timestamps: TSval 2762073800, TSecr 2668832112 Kind: Time Stamp Option (8) Length: 10 Timestamp value: 2762073800 Timestamp echo reply: 2668832112 [SEQ/ACK analysis] [iRTT: 0.000199000 seconds] [Bytes in flight: 288] Kafka Length: 284 API Key: Produce (0) API Version: 0 Correlation ID: 3484671 String Length: 13 Client ID: test-producer Required Acks: 0 Timeout: 10000 Array Count: 1 Produce Request Topic String Length: 55 Topic Name: mytopic Array Count: 1 Produce Request Partition Partition ID: 4 Message Set Size: 182 Message Set Offset: 0 Message Size: 170 Message CRC32: 0x0ece510e Magic Byte: 0 .... ..00 = Compression Codec: None (0) Bytes Length: -1 Key: <MISSING> Bytes Length: 156 Value: 00b6c4d5be0a0008726f6f740008686b6735000c246a726f... On Wed, Aug 13, 2014 at 08:15:21AM -0700, Jun Rao wrote: > Interesting, could you run DumpLogSegments with and w/o deep-iteration and > send the output around offset 1327? > > Thanks, > > Jun > > > On Tue, Aug 12, 2014 at 5:42 PM, Steve Miller <st...@idrathernotsay.com> > wrote: > > > [ "Aha!", you say, "now I know why this guy's been doing so much tshark > > stuff!" (-: ] > > > > Hi. I'm running into a strange situation, in which more or less all of > > the topics on our Kafka server behave exactly as expected... but the data > > produced by one family of applications is producing fairly frequent topic > > corruption. > > > > When this happens, on the client side, the results are all over the > > place: sometimes you get a ConsumerFetchSizeTooSmall exception, or an > > exception for an unknown error type, or an invalid-offset error, it's all > > over the map. > > > > On the server side, I think something like this is the first sign of > > badness: > > > > [2014-08-11 21:03:28,121] ERROR [KafkaApi-1] Error processing > > ProducerRequest with correlation id 6750 from client test-producer on > > partition [mytopic,9] (kafka.server.KafkaApis) > > java.lang.ArrayIndexOutOfBoundsException > > [2014-08-11 21:03:28,121] INFO [KafkaApi-1] Send the close connection > > response due to error handling produce request [clientId = test-producer, > > correlationId = 6750, topicAndPartition = [mytopic,9]] with Ack=0 > > (kafka.server.KafkaApis) > > > > shortly thereafter, you begin to see oddness facing the clients: > > > > [2014-08-11 21:17:58,132] ERROR [KafkaApi-1] Error when processing fetch > > request for partition [mytopic,9] offset 1327 from consumer with > > correlation id 87204 (kafka.server.KafkaApis) > > java.lang.IllegalStateException: Invalid message size: 0 > > at kafka.log.FileMessageSet.searchFor(FileMessageSet.scala:127) > > at kafka.log.LogSegment.translateOffset(LogSegment.scala:100) > > at kafka.log.LogSegment.read(LogSegment.scala:137) > > at kafka.log.Log.read(Log.scala:386) > > at > > kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530) > > at > > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476) > > at > > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471) > > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > > at > > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224) > > at > > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403) > > at > > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > > at > > kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:471) > > at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:437) > > at kafka.server.KafkaApis.handle(KafkaApis.scala:186) > > at > > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) > > at java.lang.Thread.run(Unknown Source) > > > > If I go run the DumpLogSegments tool on the particular topic and partition > > that's generating the errors, I can see there's corruption in the log: > > > > Non-secutive offsets in > > :/data/d3/kafka/log/mytopic-9/00000000000000000000.log > > 1327 is followed by 1327 > > > > The only thing producing data to corrupted topics was also the only thing > > where snappy compression was turned on in the Java API being used by the > > producer (it's a Storm topology; we've had the same issue with one in Scala > > and with one that produces very similar data, but that was written in > > Java). We turned that off, published to a different topic name (so it was > > created fresh), and had a couple of happy days where all was well. Then we > > decided that all was well so we tried to go back to the original topic -- > > after we'd verified that all data had aged out of the logs for that topic. > > And we started seeing errors again. So we switched to a different topic > > again, let it be created, and also started seeing errors on that topic. > > > > We have other producers, written in C and Java and python, which are > > working flawlessly, even though the size of the data they produce and the > > rate at which they produce it is much larger than what we're seeing with > > this one problematic producer. We also have producers written in other > > languages that produce at very low rates, so it's (probably) not the sort > > of thing where the issue is masked by more frequent data production. > > > > But in any case it looks like there's something the client can send that > > will corrupt the topic, which seems like something that shouldn't be able > > to happen. I know there's at least some error checking for bad protocol > > requests, as I hacked a python client to produce some corrupt messages and > > saw an error response from the server. > > > > I'm happy to supply more data but I'm not sure what would be useful. I'm > > also fine with continuing to dig into this on my own but I'd reached a > > point where it'd be useful to know if anyone had seen something like this > > before. I have a ton o' tcpdumps running and some tail -F greps running on > > the logs so that if we see that producer error again we can go find the > > corresponding tcpdump file and hopefully find the smoking gun. (It turns > > out that the real-time tshark processing invocations I sent out earlier can > > get quite far behind; I had that running when the corruption occurred > > today, but the output processing was a full hour behind the current time, > > the packet-writing part of tshark was far ahead of the packet-analyzing > > part!) > > > > Are there any particular log4j options I should turn on? Is there a way > > to just enable trace logging for a specific topic? Does trace logging > > print the contents of the message somewhere, not as something all nice and > > interpreted but as, say, a bag of hex digits? I might end up rebuilding > > kafka and adding some very specialized logging just for this. > > > > Kafka 0.8.1.1, JRE 1.6.0-71, Storm 0.9.1, RHEL6, in likely order of > > importance. (-: Also, here's the topic description: > > > > Topic:mytopic PartitionCount:10 ReplicationFactor:1 Configs: > > Topic: mytopic Partition: 0 Leader: 0 Replicas: 0 > > Isr: 0 > > Topic: mytopic Partition: 1 Leader: 1 Replicas: 1 > > Isr: 1 > > Topic: mytopic Partition: 2 Leader: 0 Replicas: 0 > > Isr: 0 > > Topic: mytopic Partition: 3 Leader: 1 Replicas: 1 > > Isr: 1 > > Topic: mytopic Partition: 4 Leader: 0 Replicas: 0 > > Isr: 0 > > Topic: mytopic Partition: 5 Leader: 1 Replicas: 1 > > Isr: 1 > > Topic: mytopic Partition: 6 Leader: 0 Replicas: 0 > > Isr: 0 > > Topic: mytopic Partition: 7 Leader: 1 Replicas: 1 > > Isr: 1 > > Topic: mytopic Partition: 8 Leader: 0 Replicas: 0 > > Isr: 0 > > Topic: mytopic Partition: 9 Leader: 1 Replicas: 1 > > Isr: 1 > > > > (2 brokers, 1 ZK server, no obvious issues with delays or process restarts > > or disk errors or any of the other usual suspects. One partition per > > filesystem. But my gut says none of that's pertinent, it's a matter of > > which partition the producer happens to be publishing to when it sends > > garbage.) > > > > > > -Steve > >