What's the output of the following command? /opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.log
Thanks, Jun On Wed, Aug 13, 2014 at 11:40 AM, Steve Miller <st...@idrathernotsay.com> wrote: > Sure. I ran: > > /opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments > --files 00000000000000000000.log --deep-iteration > > and got (in addition to the same non-secutive offsets error): > > [ ... ] > > offset: 1320 position: 344293 isvalid: true payloadsize: 208 magic: 0 > compresscodec: NoCompressionCodec crc: 1038804751 > offset: 1321 position: 344527 isvalid: true payloadsize: 194 magic: 0 > compresscodec: NoCompressionCodec crc: 1211626571 > offset: 1322 position: 344747 isvalid: true payloadsize: 195 magic: 0 > compresscodec: NoCompressionCodec crc: 228214666 > offset: 1323 position: 344968 isvalid: true payloadsize: 285 magic: 0 > compresscodec: NoCompressionCodec crc: 2412118642 > offset: 1324 position: 345279 isvalid: true payloadsize: 267 magic: 0 > compresscodec: NoCompressionCodec crc: 814469229 > offset: 1325 position: 345572 isvalid: true payloadsize: 267 magic: 0 > compresscodec: NoCompressionCodec crc: 874964779 > offset: 1326 position: 345865 isvalid: true payloadsize: 143 magic: 0 > compresscodec: NoCompressionCodec crc: 1448343333 > offset: 1327 position: 346034 isvalid: true payloadsize: 161 magic: 0 > compresscodec: NoCompressionCodec crc: 3486482767 > offset: 1327 position: 346221 isvalid: true payloadsize: 194 magic: 0 > compresscodec: NoCompressionCodec crc: 3322604516 > offset: 1328 position: 346441 isvalid: true payloadsize: 207 magic: 0 > compresscodec: NoCompressionCodec crc: 3181460980 > offset: 1329 position: 346674 isvalid: true payloadsize: 164 magic: 0 > compresscodec: NoCompressionCodec crc: 77979807 > offset: 1330 position: 346864 isvalid: true payloadsize: 208 magic: 0 > compresscodec: NoCompressionCodec crc: 3051442612 > offset: 1331 position: 347098 isvalid: true payloadsize: 196 magic: 0 > compresscodec: NoCompressionCodec crc: 1906163219 > offset: 1332 position: 347320 isvalid: true payloadsize: 196 magic: 0 > compresscodec: NoCompressionCodec crc: 3849763639 > offset: 1333 position: 347542 isvalid: true payloadsize: 207 magic: 0 > compresscodec: NoCompressionCodec crc: 3724257965 > offset: 1334 position: 347775 isvalid: true payloadsize: 194 magic: 0 > compresscodec: NoCompressionCodec crc: 510173020 > offset: 1335 position: 347995 isvalid: true payloadsize: 357 magic: 0 > compresscodec: NoCompressionCodec crc: 2043065154 > offset: 1336 position: 348378 isvalid: true payloadsize: 195 magic: 0 > compresscodec: NoCompressionCodec crc: 435251578 > offset: 1337 position: 348599 isvalid: true payloadsize: 169 magic: 0 > compresscodec: NoCompressionCodec crc: 1172187172 > offset: 1338 position: 348794 isvalid: true payloadsize: 312 magic: 0 > compresscodec: NoCompressionCodec crc: 1324582122 > offset: 1339 position: 349132 isvalid: true payloadsize: 196 magic: 0 > compresscodec: NoCompressionCodec crc: 3649742340 > offset: 1340 position: 349354 isvalid: true payloadsize: 288 magic: 0 > compresscodec: NoCompressionCodec crc: 581177172 > > (etc.) > > I also ran: > > /opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments > --files 00000000000000000000.index --deep-iteration > > At first, I got the following: > > Dumping 00000000000000000000.index > offset: 16 position: 4342 > offset: 32 position: 8555 > offset: 48 position: 12676 > offset: 63 position: 16824 > offset: 79 position: 21256 > offset: 96 position: 25599 > offset: 112 position: 29740 > offset: 126 position: 33981 > offset: 143 position: 38122 > offset: 160 position: 42364 > offset: 176 position: 46589 > offset: 192 position: 50755 > offset: 208 position: 54969 > offset: 223 position: 59207 > offset: 239 position: 63317 > offset: 255 position: 67547 > offset: 272 position: 71771 > offset: 289 position: 76012 > offset: 306 position: 80476 > offset: 323 position: 84602 > offset: 337 position: 88876 > offset: 354 position: 93153 > offset: 371 position: 97329 > offset: 387 position: 101496 > offset: 403 position: 105657 > offset: 419 position: 109848 > offset: 434 position: 113950 > offset: 451 position: 118223 > offset: 465 position: 122366 > offset: 482 position: 126463 > offset: 499 position: 130707 > offset: 517 position: 135044 > offset: 533 position: 139505 > offset: 549 position: 143637 > offset: 566 position: 147916 > offset: 582 position: 152223 > offset: 599 position: 156528 > offset: 613 position: 160694 > offset: 629 position: 164807 > offset: 644 position: 169020 > offset: 662 position: 173449 > offset: 679 position: 177721 > offset: 695 position: 182003 > offset: 711 position: 186374 > offset: 728 position: 190644 > offset: 746 position: 195036 > offset: 762 position: 199231 > offset: 778 position: 203581 > offset: 794 position: 208024 > offset: 810 position: 212192 > offset: 825 position: 216446 > offset: 841 position: 220564 > offset: 858 position: 224718 > offset: 875 position: 228823 > offset: 890 position: 232983 > offset: 907 position: 237116 > offset: 920 position: 241229 > offset: 936 position: 245504 > offset: 951 position: 249601 > offset: 969 position: 253908 > offset: 986 position: 258074 > offset: 1002 position: 262228 > offset: 1018 position: 266385 > offset: 1035 position: 270699 > offset: 1051 position: 274843 > offset: 1067 position: 278954 > offset: 1085 position: 283283 > offset: 1102 position: 287632 > offset: 1118 position: 291971 > offset: 1135 position: 296271 > offset: 1152 position: 300722 > offset: 1168 position: 304924 > offset: 1184 position: 309051 > offset: 1201 position: 313349 > offset: 1217 position: 317543 > offset: 1233 position: 321727 > offset: 1249 position: 325877 > offset: 1266 position: 330122 > offset: 1282 position: 334413 > offset: 1298 position: 338579 > offset: 1313 position: 342717 > Exception in thread "main" java.lang.OutOfMemoryError: Java heap space > at java.nio.HeapByteBuffer.<init>(Unknown Source) > at java.nio.ByteBuffer.allocate(Unknown Source) > at > kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:188) > at > kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:165) > at > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) > at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:38) > at scala.collection.IterableLike$class.head(IterableLike.scala:91) > at kafka.message.MessageSet.head(MessageSet.scala:67) > at > kafka.tools.DumpLogSegments$$anonfun$kafka$tools$DumpLogSegments$$dumpIndex$1.apply$mcVI$sp(DumpLogSegments.scala:102) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) > at > kafka.tools.DumpLogSegments$.kafka$tools$DumpLogSegments$$dumpIndex(DumpLogSegments.scala:99) > at > kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:68) > at > kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:61) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at > scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105) > at kafka.tools.DumpLogSegments$.main(DumpLogSegments.scala:61) > at kafka.tools.DumpLogSegments.main(DumpLogSegments.scala) > > If I fiddled with KAFKA_HEAP_OPTS, I could change the error to: > > Exception in thread "main" java.util.NoSuchElementException > at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39) > at scala.collection.IterableLike$class.head(IterableLike.scala:91) > at kafka.message.MessageSet.head(MessageSet.scala:67) > at > kafka.tools.DumpLogSegments$$anonfun$kafka$tools$DumpLogSegments$$dumpIndex$1.apply$mcVI$sp(DumpLogSegments.scala:102) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) > at > kafka.tools.DumpLogSegments$.kafka$tools$DumpLogSegments$$dumpIndex(DumpLogSegments.scala:99) > at > kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:68) > at > kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:61) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at > scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105) > at kafka.tools.DumpLogSegments$.main(DumpLogSegments.scala:61) > at kafka.tools.DumpLogSegments.main(DumpLogSegments.scala) > > (bumping it to 1024M didn't do the trick, I still got the heap-space > exception then, but going to 10240M gave it enough heap to fail this way > (-: ). > > Also BTW: there seems to be a bug in dumpIndex() in > DumpLogSegments.scala, as it is unhappy if the topic name has a dot in it. > If you try to use DumpLogSegments to look at such a file (well, if it's in > the "real" log directory, of course), if you topic was named X.Y, and > you're in the directory X.Y-9, and you're looking at > 00000000000000000000.index, you'll get an error like: > > Exception in thread "main" java.io.FileNotFoundException: > /home/whatever/X.log (No such file or directory) > at java.io.FileInputStream.open(Native Method) > at java.io.FileInputStream.<init>(Unknown Source) > at kafka.utils.Utils$.openChannel(Utils.scala:157) > at kafka.log.FileMessageSet.<init>(FileMessageSet.scala:74) > at > kafka.tools.DumpLogSegments$.kafka$tools$DumpLogSegments$$dumpIndex(DumpLogSegments.scala:97) > at > kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:68) > at > kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:61) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at > scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105) > at kafka.tools.DumpLogSegments$.main(DumpLogSegments.scala:61) > at kafka.tools.DumpLogSegments.main(DumpLogSegments.scala) > > > because these both assume that there's only one dot on which to split: > > val startOffset = file.getName().split("\\.")(0).toLong > val logFileName = file.getAbsolutePath.split("\\.")(0) + > Log.LogFileSuffix > > (it's not bad to work around, copy or move the files to a directory > without dots in it, and you're OK, but I did want to point it out). > > Overnight, I did a ton of pcap captures, while having something watch > the logging output looking for the same sort of ProducerRequest error I'd > described below. It's a lot of data to sort through and I'm still poking > at it, but at first glance at least I could see the ProduceRequest that > produced a similar error... and either it wasn't messed up or if it's > messed up, it is messed up in a way that such that the Kafka protocol > decoder doesn't see the issue. > > For example, here's an error: > > [2014-08-13 00:28:11,232] ERROR [KafkaApi-0] Error processing > ProducerRequest with correlation id 3484669 from client test-producer on > partition [mytopic,4] (kafka.server.KafkaApis) > java.lang.ArrayIndexOutOfBoundsException > [2014-08-13 00:28:11,233] INFO [KafkaApi-0] Send the close connection > response due to error handling produce request [clientId = test-producer, > correlationId = 3484669, topicAndPartition = [mytopic,4]] with Ack=0 > (kafka.server.KafkaApis) > > and below is the ProduceRequest before the one that produced the error, > then the one that produced the error, then the next one (note that the > topic name has been changed but it is 55 bytes long). > > One other interesting thing I see, though: for one of the times we saw > this corruption, it looks like there was also some leader-election flappage > going on (again, topic name changed to something innocuous). Here's the > error: > > [2014-08-13 04:45:41,750] ERROR [KafkaApi-0] Error processing > ProducerRequest with correlation id 5016862 from client test-producer on > partition [mytopic,8] (kafka.server.KafkaApis) > java.lang.ArrayIndexOutOfBoundsException > [2014-08-13 04:45:41,750] INFO [KafkaApi-0] Send the close connection > response due to error handling produce request [clientId = test-producer, > correlationId = 5016862, topicAndPartition = [mytopic,8]] with Ack=0 > (kafka.server.KafkaApis) > > and here's what I saw in state-change.log that was related to the same > topic and partition: > > [2014-08-13 04:45:28,018] TRACE Controller 0 epoch 3 changed partition > [mytopic,8] state from OnlinePartition to OfflinePartition > (state.change.logger) > [2014-08-13 04:45:28,021] TRACE Controller 0 epoch 3 started leader > election for partition [mytopic,8] (state.change.logger) > [2014-08-13 04:45:28,026] ERROR Controller 0 epoch 3 initiated state > change for partition [mytopic,8] from OfflinePartition to OnlinePartition > failed (state.change.logger) > [2014-08-13 04:45:28,933] TRACE Controller 0 epoch 3 changed state of > replica 1 for partition [mytopic,8] from OnlineReplica to OfflineReplica > (state.change.logger) > [2014-08-13 04:45:29,150] TRACE Controller 0 epoch 3 sending > UpdateMetadata request (Leader:-1,ISR:,LeaderEpoch:89,ControllerEpoch:3) > with correlationId 276 to broker 0 for partition [mytopic,8] > (state.change.logger) > [2014-08-13 04:45:29,155] TRACE Broker 0 cached leader info > (LeaderAndIsrInfo:(Leader:-1,ISR:,LeaderEpoch:89,ControllerEpoch:3),ReplicationFactor:1),AllReplicas:1) > for partition [mytopic,8] in response to UpdateMetadata request sent by > cont roller 0 epoch 3 with correlation id 276 (state.change.logger) > [2014-08-13 04:45:33,940] TRACE Controller 0 epoch 3 sending > UpdateMetadata request (Leader:-1,ISR:,LeaderEpoch:89,ControllerEpoch:3) > with correlationId 277 to broker 1 for partition [mytopic,8] > (state.change.logger) > [2014-08-13 04:45:33,940] TRACE Controller 0 epoch 3 sending > UpdateMetadata request (Leader:0,ISR:0,LeaderEpoch:1,ControllerEpoch:3) > with correlationId 277 to broker 1 for partition [mytopic,8] > (state.change.logger) > [2014-08-13 04:45:33,949] TRACE Controller 0 epoch 3 changed state of > replica 1 for partition [mytopic,8] from OfflineReplica to OnlineReplica > (state.change.logger) > [2014-08-13 04:45:33,954] TRACE Controller 0 epoch 3 sending > become-follower LeaderAndIsr request > (Leader:-1,ISR:,LeaderEpoch:89,ControllerEpoch:3) with correlationId 278 to > broker 1 for partition [mytopic,8] (state.change.logger) > [2014-08-13 04:45:33,956] TRACE Controller 0 epoch 3 sending > UpdateMetadata request (Leader:-1,ISR:,LeaderEpoch:89,ControllerEpoch:3) > with correlationId 278 to broker 1 for partition [mytopic,8] > (state.change.logger) > [2014-08-13 04:45:33,959] TRACE Controller 0 epoch 3 sending > UpdateMetadata request (Leader:-1,ISR:,LeaderEpoch:89,ControllerEpoch:3) > with correlationId 278 to broker 0 for partition [mytopic,8] > (state.change.logger) > [2014-08-13 04:45:33,961] TRACE Controller 0 epoch 3 started leader > election for partition [mytopic,8] (state.change.logger) > [2014-08-13 04:45:33,963] TRACE Broker 0 cached leader info > (LeaderAndIsrInfo:(Leader:-1,ISR:,LeaderEpoch:89,ControllerEpoch:3),ReplicationFactor:1),AllReplicas:1) > for partition [mytopic,8] in response to UpdateMetadata request sent by > controller 0 epoch 3 with correlation id 278 (state.change.logger) > [2014-08-13 04:45:33,968] TRACE Controller 0 epoch 3 elected leader 1 for > Offline partition [mytopic,8] (state.change.logger) > [2014-08-13 04:45:33,969] TRACE Controller 0 epoch 3 changed partition > [mytopic,8] from OfflinePartition to OnlinePartition with leader 1 > (state.change.logger) > [2014-08-13 04:45:39,245] TRACE Controller 0 epoch 3 sending become-leader > LeaderAndIsr request (Leader:1,ISR:1,LeaderEpoch:90,ControllerEpoch:3) with > correlationId 279 to broker 1 for partition [mytopic,8] > (state.change.logger) > [2014-08-13 04:45:39,248] TRACE Controller 0 epoch 3 sending > UpdateMetadata request (Leader:1,ISR:1,LeaderEpoch:90,ControllerEpoch:3) > with correlationId 279 to broker 1 for partition [mytopic,8] > (state.change.logger) > [2014-08-13 04:45:39,251] TRACE Controller 0 epoch 3 sending > UpdateMetadata request (Leader:1,ISR:1,LeaderEpoch:90,ControllerEpoch:3) > with correlationId 279 to broker 0 for partition [mytopic,8] > (state.change.logger) > [2014-08-13 04:45:39,255] TRACE Broker 0 cached leader info > (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:90,ControllerEpoch:3),ReplicationFactor:1),AllReplicas:1) > for partition [mytopic,8] in response to UpdateMetadata request sent by > controller 0 epoch 3 with correlation id 279 (state.change.logger) > > Strangely, though, I see another ProducerRequest error at around > 09:08:11, but I don't see any obvious controller-related flappage at that > time. So maybe correlation (leader election at nearly the same time we see > the error) doesn't imply causation. > > I don't know. This is the only producer that I can think of here that > produces any significant number of messages (maybe a burst of 600ish, every > 30sec or so) and that does so one at a time rather than by batching them. > Maybe there's an element here that's related to publishing lots of message > sets with only one (small) message in them? > > If there's anything else you want me to look at, and/or if you want me > to reformat this somehow so it's more readable (I know it's a lot of stuff > inline), please let me know. Thanks! > > -Steve > > =========== sample transaction data for one case of this sort of error > Frame 8451: 516 bytes on wire (4128 bits), 516 bytes captured (4128 bits) > Encapsulation type: Ethernet (1) > Arrival Time: Aug 13, 2014 00:28:11.232408000 UTC > [Time shift for this packet: 0.000000000 seconds] > Epoch Time: 1407889691.232408000 seconds > [Time delta from previous captured frame: 0.000479000 seconds] > [Time delta from previous displayed frame: 0.000479000 seconds] > [Time since reference or first frame: 191.232312000 seconds] > Frame Number: 8451 > Frame Length: 516 bytes (4128 bits) > Capture Length: 516 bytes (4128 bits) > [Frame is marked: False] > [Frame is ignored: False] > [Protocols in frame: eth:ethertype:ip:tcp:kafka] > [Number of per-protocol-data: 1] > [Kafka, key 0] > Ethernet II, Src: 84:2b:2b:4e:47:b0 (84:2b:2b:4e:47:b0), Dst: > 84:2b:2b:4e:7b:a1 (84:2b:2b:4e:7b:a1) > Destination: 84:2b:2b:4e:7b:a1 (84:2b:2b:4e:7b:a1) > Address: 84:2b:2b:4e:7b:a1 (84:2b:2b:4e:7b:a1) > .... ..0. .... .... .... .... = LG bit: Globally unique address > (factory default) > .... ...0 .... .... .... .... = IG bit: Individual address > (unicast) > Source: 84:2b:2b:4e:47:b0 (84:2b:2b:4e:47:b0) > Address: 84:2b:2b:4e:47:b0 (84:2b:2b:4e:47:b0) > .... ..0. .... .... .... .... = LG bit: Globally unique address > (factory default) > .... ...0 .... .... .... .... = IG bit: Individual address > (unicast) > Type: IP (0x0800) > Internet Protocol Version 4, Src: 10.163.193.125 (10.163.193.125), Dst: > 10.163.193.121 (10.163.193.121) > Version: 4 > Header Length: 20 bytes > Differentiated Services Field: 0x00 (DSCP 0x00: Default; ECN: 0x00: > Not-ECT (Not ECN-Capable Transport)) > 0000 00.. = Differentiated Services Codepoint: Default (0x00) > .... ..00 = Explicit Congestion Notification: Not-ECT (Not > ECN-Capable Transport) (0x00) > Total Length: 502 > Identification: 0x2b7a (11130) > Flags: 0x02 (Don't Fragment) > 0... .... = Reserved bit: Not set > .1.. .... = Don't fragment: Set > ..0. .... = More fragments: Not set > Fragment offset: 0 > Time to live: 64 > Protocol: TCP (6) > Header checksum: 0x754b [validation disabled] > [Good: False] > [Bad: False] > Source: 10.163.193.125 (10.163.193.125) > Destination: 10.163.193.121 (10.163.193.121) > [Source GeoIP: Unknown] > [Destination GeoIP: Unknown] > Transmission Control Protocol, Src Port: 33857 (33857), Dst Port: 9092 > (9092), Seq: 3091465, Ack: 1, Len: 450 > Source Port: 33857 (33857) > Destination Port: 9092 (9092) > [Stream index: 14] > [TCP Segment Len: 450] > Sequence number: 3091465 (relative sequence number) > [Next sequence number: 3091915 (relative sequence number)] > Acknowledgment number: 1 (relative ack number) > Header Length: 32 bytes > .... 0000 0001 1000 = Flags: 0x018 (PSH, ACK) > 000. .... .... = Reserved: Not set > ...0 .... .... = Nonce: Not set > .... 0... .... = Congestion Window Reduced (CWR): Not set > .... .0.. .... = ECN-Echo: Not set > .... ..0. .... = Urgent: Not set > .... ...1 .... = Acknowledgment: Set > .... .... 1... = Push: Set > .... .... .0.. = Reset: Not set > .... .... ..0. = Syn: Not set > .... .... ...0 = Fin: Not set > Window size value: 107 > [Calculated window size: 13696] > [Window size scaling factor: 128] > Checksum: 0x10a1 [validation disabled] > [Good Checksum: False] > [Bad Checksum: False] > Urgent pointer: 0 > Options: (12 bytes), No-Operation (NOP), No-Operation (NOP), Timestamps > No-Operation (NOP) > Type: 1 > 0... .... = Copy on fragmentation: No > .00. .... = Class: Control (0) > ...0 0001 = Number: No-Operation (NOP) (1) > No-Operation (NOP) > Type: 1 > 0... .... = Copy on fragmentation: No > .00. .... = Class: Control (0) > ...0 0001 = Number: No-Operation (NOP) (1) > Timestamps: TSval 2762073799, TSecr 2668832110 > Kind: Time Stamp Option (8) > Length: 10 > Timestamp value: 2762073799 > Timestamp echo reply: 2668832110 > [SEQ/ACK analysis] > [iRTT: 0.000199000 seconds] > [Bytes in flight: 450] > Kafka > Length: 446 > API Key: Produce (0) > API Version: 0 > Correlation ID: 3484667 > String Length: 13 > Client ID: test-producer > Required Acks: 0 > Timeout: 10000 > Array Count: 1 > Produce Request Topic > String Length: 55 > Topic Name: mytopic > Array Count: 1 > Produce Request Partition > Partition ID: 4 > Message Set Size: 344 > Message Set > Offset: 0 > Message Size: 332 > Message > CRC32: 0x81b88c04 > Magic Byte: 0 > .... ..00 = Compression Codec: None (0) > Bytes Length: -1 > Key: <MISSING> > Bytes Length: 318 > Value: > 00b6c4d5be0a000a73646e73320008746b6f320012786e2d... > > Frame 8452: 465 bytes on wire (3720 bits), 465 bytes captured (3720 bits) > Encapsulation type: Ethernet (1) > Arrival Time: Aug 13, 2014 00:28:11.232797000 UTC > [Time shift for this packet: 0.000000000 seconds] > Epoch Time: 1407889691.232797000 seconds > [Time delta from previous captured frame: 0.000389000 seconds] > [Time delta from previous displayed frame: 0.000389000 seconds] > [Time since reference or first frame: 191.232701000 seconds] > Frame Number: 8452 > Frame Length: 465 bytes (3720 bits) > Capture Length: 465 bytes (3720 bits) > [Frame is marked: False] > [Frame is ignored: False] > [Protocols in frame: eth:ethertype:ip:tcp:kafka] > [Number of per-protocol-data: 1] > [Kafka, key 0] > Ethernet II, Src: 84:2b:2b:4e:47:b0 (84:2b:2b:4e:47:b0), Dst: > 84:2b:2b:4e:7b:a1 (84:2b:2b:4e:7b:a1) > Destination: 84:2b:2b:4e:7b:a1 (84:2b:2b:4e:7b:a1) > Address: 84:2b:2b:4e:7b:a1 (84:2b:2b:4e:7b:a1) > .... ..0. .... .... .... .... = LG bit: Globally unique address > (factory default) > .... ...0 .... .... .... .... = IG bit: Individual address > (unicast) > Source: 84:2b:2b:4e:47:b0 (84:2b:2b:4e:47:b0) > Address: 84:2b:2b:4e:47:b0 (84:2b:2b:4e:47:b0) > .... ..0. .... .... .... .... = LG bit: Globally unique address > (factory default) > .... ...0 .... .... .... .... = IG bit: Individual address > (unicast) > Type: IP (0x0800) > Internet Protocol Version 4, Src: 10.163.193.125 (10.163.193.125), Dst: > 10.163.193.121 (10.163.193.121) > Version: 4 > Header Length: 20 bytes > Differentiated Services Field: 0x00 (DSCP 0x00: Default; ECN: 0x00: > Not-ECT (Not ECN-Capable Transport)) > 0000 00.. = Differentiated Services Codepoint: Default (0x00) > .... ..00 = Explicit Congestion Notification: Not-ECT (Not > ECN-Capable Transport) (0x00) > Total Length: 451 > Identification: 0x2b7b (11131) > Flags: 0x02 (Don't Fragment) > 0... .... = Reserved bit: Not set > .1.. .... = Don't fragment: Set > ..0. .... = More fragments: Not set > Fragment offset: 0 > Time to live: 64 > Protocol: TCP (6) > Header checksum: 0x757d [validation disabled] > [Good: False] > [Bad: False] > Source: 10.163.193.125 (10.163.193.125) > Destination: 10.163.193.121 (10.163.193.121) > [Source GeoIP: Unknown] > [Destination GeoIP: Unknown] > Transmission Control Protocol, Src Port: 33857 (33857), Dst Port: 9092 > (9092), Seq: 3091915, Ack: 1, Len: 399 > Source Port: 33857 (33857) > Destination Port: 9092 (9092) > [Stream index: 14] > [TCP Segment Len: 399] > Sequence number: 3091915 (relative sequence number) > [Next sequence number: 3092314 (relative sequence number)] > Acknowledgment number: 1 (relative ack number) > Header Length: 32 bytes > .... 0000 0001 1000 = Flags: 0x018 (PSH, ACK) > 000. .... .... = Reserved: Not set > ...0 .... .... = Nonce: Not set > .... 0... .... = Congestion Window Reduced (CWR): Not set > .... .0.. .... = ECN-Echo: Not set > .... ..0. .... = Urgent: Not set > .... ...1 .... = Acknowledgment: Set > .... .... 1... = Push: Set > .... .... .0.. = Reset: Not set > .... .... ..0. = Syn: Not set > .... .... ...0 = Fin: Not set > Window size value: 107 > [Calculated window size: 13696] > [Window size scaling factor: 128] > Checksum: 0x6f34 [validation disabled] > [Good Checksum: False] > [Bad Checksum: False] > Urgent pointer: 0 > Options: (12 bytes), No-Operation (NOP), No-Operation (NOP), Timestamps > No-Operation (NOP) > Type: 1 > 0... .... = Copy on fragmentation: No > .00. .... = Class: Control (0) > ...0 0001 = Number: No-Operation (NOP) (1) > No-Operation (NOP) > Type: 1 > 0... .... = Copy on fragmentation: No > .00. .... = Class: Control (0) > ...0 0001 = Number: No-Operation (NOP) (1) > Timestamps: TSval 2762073799, TSecr 2668832111 > Kind: Time Stamp Option (8) > Length: 10 > Timestamp value: 2762073799 > Timestamp echo reply: 2668832111 > [SEQ/ACK analysis] > [iRTT: 0.000199000 seconds] > [Bytes in flight: 399] > Kafka > Length: 395 > API Key: Produce (0) > API Version: 0 > Correlation ID: 3484669 > String Length: 13 > Client ID: test-producer > Required Acks: 0 > Timeout: 10000 > Array Count: 1 > Produce Request Topic > String Length: 55 > Topic Name: mytopic > Array Count: 1 > Produce Request Partition > Partition ID: 4 > Message Set Size: 293 > Message Set > Offset: 0 > Message Size: 281 > Message > CRC32: 0x80ce403c > Magic Byte: 0 > .... ..00 = Compression Codec: None (0) > Bytes Length: -1 > Key: <MISSING> > Bytes Length: 267 > Value: > 00b6c4d5be0a001061646e7363746c640008706172320006... > > Frame 8453: 354 bytes on wire (2832 bits), 354 bytes captured (2832 bits) > Encapsulation type: Ethernet (1) > Arrival Time: Aug 13, 2014 00:28:11.233223000 UTC > [Time shift for this packet: 0.000000000 seconds] > Epoch Time: 1407889691.233223000 seconds > [Time delta from previous captured frame: 0.000426000 seconds] > [Time delta from previous displayed frame: 0.000426000 seconds] > [Time since reference or first frame: 191.233127000 seconds] > Frame Number: 8453 > Frame Length: 354 bytes (2832 bits) > Capture Length: 354 bytes (2832 bits) > [Frame is marked: False] > [Frame is ignored: False] > [Protocols in frame: eth:ethertype:ip:tcp:kafka] > [Number of per-protocol-data: 1] > [Kafka, key 0] > Ethernet II, Src: 84:2b:2b:4e:47:b0 (84:2b:2b:4e:47:b0), Dst: > 84:2b:2b:4e:7b:a1 (84:2b:2b:4e:7b:a1) > Destination: 84:2b:2b:4e:7b:a1 (84:2b:2b:4e:7b:a1) > Address: 84:2b:2b:4e:7b:a1 (84:2b:2b:4e:7b:a1) > .... ..0. .... .... .... .... = LG bit: Globally unique address > (factory default) > .... ...0 .... .... .... .... = IG bit: Individual address > (unicast) > Source: 84:2b:2b:4e:47:b0 (84:2b:2b:4e:47:b0) > Address: 84:2b:2b:4e:47:b0 (84:2b:2b:4e:47:b0) > .... ..0. .... .... .... .... = LG bit: Globally unique address > (factory default) > .... ...0 .... .... .... .... = IG bit: Individual address > (unicast) > Type: IP (0x0800) > Internet Protocol Version 4, Src: 10.163.193.125 (10.163.193.125), Dst: > 10.163.193.121 (10.163.193.121) > Version: 4 > Header Length: 20 bytes > Differentiated Services Field: 0x00 (DSCP 0x00: Default; ECN: 0x00: > Not-ECT (Not ECN-Capable Transport)) > 0000 00.. = Differentiated Services Codepoint: Default (0x00) > .... ..00 = Explicit Congestion Notification: Not-ECT (Not > ECN-Capable Transport) (0x00) > Total Length: 340 > Identification: 0x2b7c (11132) > Flags: 0x02 (Don't Fragment) > 0... .... = Reserved bit: Not set > .1.. .... = Don't fragment: Set > ..0. .... = More fragments: Not set > Fragment offset: 0 > Time to live: 64 > Protocol: TCP (6) > Header checksum: 0x75eb [validation disabled] > [Good: False] > [Bad: False] > Source: 10.163.193.125 (10.163.193.125) > Destination: 10.163.193.121 (10.163.193.121) > [Source GeoIP: Unknown] > [Destination GeoIP: Unknown] > Transmission Control Protocol, Src Port: 33857 (33857), Dst Port: 9092 > (9092), Seq: 3092314, Ack: 1, Len: 288 > Source Port: 33857 (33857) > Destination Port: 9092 (9092) > [Stream index: 14] > [TCP Segment Len: 288] > Sequence number: 3092314 (relative sequence number) > [Next sequence number: 3092602 (relative sequence number)] > Acknowledgment number: 1 (relative ack number) > Header Length: 32 bytes > .... 0000 0001 1000 = Flags: 0x018 (PSH, ACK) > 000. .... .... = Reserved: Not set > ...0 .... .... = Nonce: Not set > .... 0... .... = Congestion Window Reduced (CWR): Not set > .... .0.. .... = ECN-Echo: Not set > .... ..0. .... = Urgent: Not set > .... ...1 .... = Acknowledgment: Set > .... .... 1... = Push: Set > .... .... .0.. = Reset: Not set > .... .... ..0. = Syn: Not set > .... .... ...0 = Fin: Not set > Window size value: 107 > [Calculated window size: 13696] > [Window size scaling factor: 128] > Checksum: 0xea49 [validation disabled] > [Good Checksum: False] > [Bad Checksum: False] > Urgent pointer: 0 > Options: (12 bytes), No-Operation (NOP), No-Operation (NOP), Timestamps > No-Operation (NOP) > Type: 1 > 0... .... = Copy on fragmentation: No > .00. .... = Class: Control (0) > ...0 0001 = Number: No-Operation (NOP) (1) > No-Operation (NOP) > Type: 1 > 0... .... = Copy on fragmentation: No > .00. .... = Class: Control (0) > ...0 0001 = Number: No-Operation (NOP) (1) > Timestamps: TSval 2762073800, TSecr 2668832112 > Kind: Time Stamp Option (8) > Length: 10 > Timestamp value: 2762073800 > Timestamp echo reply: 2668832112 > [SEQ/ACK analysis] > [iRTT: 0.000199000 seconds] > [Bytes in flight: 288] > Kafka > Length: 284 > API Key: Produce (0) > API Version: 0 > Correlation ID: 3484671 > String Length: 13 > Client ID: test-producer > Required Acks: 0 > Timeout: 10000 > Array Count: 1 > Produce Request Topic > String Length: 55 > Topic Name: mytopic > Array Count: 1 > Produce Request Partition > Partition ID: 4 > Message Set Size: 182 > Message Set > Offset: 0 > Message Size: 170 > Message > CRC32: 0x0ece510e > Magic Byte: 0 > .... ..00 = Compression Codec: None (0) > Bytes Length: -1 > Key: <MISSING> > Bytes Length: 156 > Value: > 00b6c4d5be0a0008726f6f740008686b6735000c246a726f... > > > On Wed, Aug 13, 2014 at 08:15:21AM -0700, Jun Rao wrote: > > Interesting, could you run DumpLogSegments with and w/o deep-iteration > and > > send the output around offset 1327? > > > > Thanks, > > > > Jun > > > > > > On Tue, Aug 12, 2014 at 5:42 PM, Steve Miller <st...@idrathernotsay.com> > > wrote: > > > > > [ "Aha!", you say, "now I know why this guy's been doing so much tshark > > > stuff!" (-: ] > > > > > > Hi. I'm running into a strange situation, in which more or less > all of > > > the topics on our Kafka server behave exactly as expected... but the > data > > > produced by one family of applications is producing fairly frequent > topic > > > corruption. > > > > > > When this happens, on the client side, the results are all over the > > > place: sometimes you get a ConsumerFetchSizeTooSmall exception, or an > > > exception for an unknown error type, or an invalid-offset error, it's > all > > > over the map. > > > > > > On the server side, I think something like this is the first sign of > > > badness: > > > > > > [2014-08-11 21:03:28,121] ERROR [KafkaApi-1] Error processing > > > ProducerRequest with correlation id 6750 from client test-producer on > > > partition [mytopic,9] (kafka.server.KafkaApis) > > > java.lang.ArrayIndexOutOfBoundsException > > > [2014-08-11 21:03:28,121] INFO [KafkaApi-1] Send the close connection > > > response due to error handling produce request [clientId = > test-producer, > > > correlationId = 6750, topicAndPartition = [mytopic,9]] with Ack=0 > > > (kafka.server.KafkaApis) > > > > > > shortly thereafter, you begin to see oddness facing the clients: > > > > > > [2014-08-11 21:17:58,132] ERROR [KafkaApi-1] Error when processing > fetch > > > request for partition [mytopic,9] offset 1327 from consumer with > > > correlation id 87204 (kafka.server.KafkaApis) > > > java.lang.IllegalStateException: Invalid message size: 0 > > > at kafka.log.FileMessageSet.searchFor(FileMessageSet.scala:127) > > > at kafka.log.LogSegment.translateOffset(LogSegment.scala:100) > > > at kafka.log.LogSegment.read(LogSegment.scala:137) > > > at kafka.log.Log.read(Log.scala:386) > > > at > > > > kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530) > > > at > > > > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476) > > > at > > > > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471) > > > at > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > > > at > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > > > at > > > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224) > > > at > > > > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403) > > > at > > > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > > > at > scala.collection.AbstractTraversable.map(Traversable.scala:105) > > > at > > > > kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:471) > > > at > kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:437) > > > at kafka.server.KafkaApis.handle(KafkaApis.scala:186) > > > at > > > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) > > > at java.lang.Thread.run(Unknown Source) > > > > > > If I go run the DumpLogSegments tool on the particular topic and > partition > > > that's generating the errors, I can see there's corruption in the log: > > > > > > Non-secutive offsets in > > > :/data/d3/kafka/log/mytopic-9/00000000000000000000.log > > > 1327 is followed by 1327 > > > > > > The only thing producing data to corrupted topics was also the only > thing > > > where snappy compression was turned on in the Java API being used by > the > > > producer (it's a Storm topology; we've had the same issue with one in > Scala > > > and with one that produces very similar data, but that was written in > > > Java). We turned that off, published to a different topic name (so it > was > > > created fresh), and had a couple of happy days where all was well. > Then we > > > decided that all was well so we tried to go back to the original topic > -- > > > after we'd verified that all data had aged out of the logs for that > topic. > > > And we started seeing errors again. So we switched to a different > topic > > > again, let it be created, and also started seeing errors on that topic. > > > > > > We have other producers, written in C and Java and python, which are > > > working flawlessly, even though the size of the data they produce and > the > > > rate at which they produce it is much larger than what we're seeing > with > > > this one problematic producer. We also have producers written in other > > > languages that produce at very low rates, so it's (probably) not the > sort > > > of thing where the issue is masked by more frequent data production. > > > > > > But in any case it looks like there's something the client can send > that > > > will corrupt the topic, which seems like something that shouldn't be > able > > > to happen. I know there's at least some error checking for bad > protocol > > > requests, as I hacked a python client to produce some corrupt messages > and > > > saw an error response from the server. > > > > > > I'm happy to supply more data but I'm not sure what would be useful. > I'm > > > also fine with continuing to dig into this on my own but I'd reached a > > > point where it'd be useful to know if anyone had seen something like > this > > > before. I have a ton o' tcpdumps running and some tail -F greps > running on > > > the logs so that if we see that producer error again we can go find the > > > corresponding tcpdump file and hopefully find the smoking gun. (It > turns > > > out that the real-time tshark processing invocations I sent out > earlier can > > > get quite far behind; I had that running when the corruption occurred > > > today, but the output processing was a full hour behind the current > time, > > > the packet-writing part of tshark was far ahead of the packet-analyzing > > > part!) > > > > > > Are there any particular log4j options I should turn on? Is there a > way > > > to just enable trace logging for a specific topic? Does trace logging > > > print the contents of the message somewhere, not as something all nice > and > > > interpreted but as, say, a bag of hex digits? I might end up > rebuilding > > > kafka and adding some very specialized logging just for this. > > > > > > Kafka 0.8.1.1, JRE 1.6.0-71, Storm 0.9.1, RHEL6, in likely order of > > > importance. (-: Also, here's the topic description: > > > > > > Topic:mytopic PartitionCount:10 ReplicationFactor:1 > Configs: > > > Topic: mytopic Partition: 0 Leader: 0 Replicas: 0 > > > Isr: 0 > > > Topic: mytopic Partition: 1 Leader: 1 Replicas: 1 > > > Isr: 1 > > > Topic: mytopic Partition: 2 Leader: 0 Replicas: 0 > > > Isr: 0 > > > Topic: mytopic Partition: 3 Leader: 1 Replicas: 1 > > > Isr: 1 > > > Topic: mytopic Partition: 4 Leader: 0 Replicas: 0 > > > Isr: 0 > > > Topic: mytopic Partition: 5 Leader: 1 Replicas: 1 > > > Isr: 1 > > > Topic: mytopic Partition: 6 Leader: 0 Replicas: 0 > > > Isr: 0 > > > Topic: mytopic Partition: 7 Leader: 1 Replicas: 1 > > > Isr: 1 > > > Topic: mytopic Partition: 8 Leader: 0 Replicas: 0 > > > Isr: 0 > > > Topic: mytopic Partition: 9 Leader: 1 Replicas: 1 > > > Isr: 1 > > > > > > (2 brokers, 1 ZK server, no obvious issues with delays or process > restarts > > > or disk errors or any of the other usual suspects. One partition per > > > filesystem. But my gut says none of that's pertinent, it's a matter of > > > which partition the producer happens to be publishing to when it sends > > > garbage.) > > > > > > > > > -Steve > > > > >