What's the output of the following command?

        /opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments
--files 00000000000000000000.log

Thanks,

Jun


On Wed, Aug 13, 2014 at 11:40 AM, Steve Miller <st...@idrathernotsay.com>
wrote:

>    Sure.  I ran:
>
>         /opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments
> --files 00000000000000000000.log --deep-iteration
>
> and got (in addition to the same non-secutive offsets error):
>
> [ ... ]
>
> offset: 1320 position: 344293 isvalid: true payloadsize: 208 magic: 0
> compresscodec: NoCompressionCodec crc: 1038804751
> offset: 1321 position: 344527 isvalid: true payloadsize: 194 magic: 0
> compresscodec: NoCompressionCodec crc: 1211626571
> offset: 1322 position: 344747 isvalid: true payloadsize: 195 magic: 0
> compresscodec: NoCompressionCodec crc: 228214666
> offset: 1323 position: 344968 isvalid: true payloadsize: 285 magic: 0
> compresscodec: NoCompressionCodec crc: 2412118642
> offset: 1324 position: 345279 isvalid: true payloadsize: 267 magic: 0
> compresscodec: NoCompressionCodec crc: 814469229
> offset: 1325 position: 345572 isvalid: true payloadsize: 267 magic: 0
> compresscodec: NoCompressionCodec crc: 874964779
> offset: 1326 position: 345865 isvalid: true payloadsize: 143 magic: 0
> compresscodec: NoCompressionCodec crc: 1448343333
> offset: 1327 position: 346034 isvalid: true payloadsize: 161 magic: 0
> compresscodec: NoCompressionCodec crc: 3486482767
> offset: 1327 position: 346221 isvalid: true payloadsize: 194 magic: 0
> compresscodec: NoCompressionCodec crc: 3322604516
> offset: 1328 position: 346441 isvalid: true payloadsize: 207 magic: 0
> compresscodec: NoCompressionCodec crc: 3181460980
> offset: 1329 position: 346674 isvalid: true payloadsize: 164 magic: 0
> compresscodec: NoCompressionCodec crc: 77979807
> offset: 1330 position: 346864 isvalid: true payloadsize: 208 magic: 0
> compresscodec: NoCompressionCodec crc: 3051442612
> offset: 1331 position: 347098 isvalid: true payloadsize: 196 magic: 0
> compresscodec: NoCompressionCodec crc: 1906163219
> offset: 1332 position: 347320 isvalid: true payloadsize: 196 magic: 0
> compresscodec: NoCompressionCodec crc: 3849763639
> offset: 1333 position: 347542 isvalid: true payloadsize: 207 magic: 0
> compresscodec: NoCompressionCodec crc: 3724257965
> offset: 1334 position: 347775 isvalid: true payloadsize: 194 magic: 0
> compresscodec: NoCompressionCodec crc: 510173020
> offset: 1335 position: 347995 isvalid: true payloadsize: 357 magic: 0
> compresscodec: NoCompressionCodec crc: 2043065154
> offset: 1336 position: 348378 isvalid: true payloadsize: 195 magic: 0
> compresscodec: NoCompressionCodec crc: 435251578
> offset: 1337 position: 348599 isvalid: true payloadsize: 169 magic: 0
> compresscodec: NoCompressionCodec crc: 1172187172
> offset: 1338 position: 348794 isvalid: true payloadsize: 312 magic: 0
> compresscodec: NoCompressionCodec crc: 1324582122
> offset: 1339 position: 349132 isvalid: true payloadsize: 196 magic: 0
> compresscodec: NoCompressionCodec crc: 3649742340
> offset: 1340 position: 349354 isvalid: true payloadsize: 288 magic: 0
> compresscodec: NoCompressionCodec crc: 581177172
>
> (etc.)
>
> I also ran:
>
>          /opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments
> --files 00000000000000000000.index --deep-iteration
>
> At first, I got the following:
>
> Dumping 00000000000000000000.index
> offset: 16 position: 4342
> offset: 32 position: 8555
> offset: 48 position: 12676
> offset: 63 position: 16824
> offset: 79 position: 21256
> offset: 96 position: 25599
> offset: 112 position: 29740
> offset: 126 position: 33981
> offset: 143 position: 38122
> offset: 160 position: 42364
> offset: 176 position: 46589
> offset: 192 position: 50755
> offset: 208 position: 54969
> offset: 223 position: 59207
> offset: 239 position: 63317
> offset: 255 position: 67547
> offset: 272 position: 71771
> offset: 289 position: 76012
> offset: 306 position: 80476
> offset: 323 position: 84602
> offset: 337 position: 88876
> offset: 354 position: 93153
> offset: 371 position: 97329
> offset: 387 position: 101496
> offset: 403 position: 105657
> offset: 419 position: 109848
> offset: 434 position: 113950
> offset: 451 position: 118223
> offset: 465 position: 122366
> offset: 482 position: 126463
> offset: 499 position: 130707
> offset: 517 position: 135044
> offset: 533 position: 139505
> offset: 549 position: 143637
> offset: 566 position: 147916
> offset: 582 position: 152223
> offset: 599 position: 156528
> offset: 613 position: 160694
> offset: 629 position: 164807
> offset: 644 position: 169020
> offset: 662 position: 173449
> offset: 679 position: 177721
> offset: 695 position: 182003
> offset: 711 position: 186374
> offset: 728 position: 190644
> offset: 746 position: 195036
> offset: 762 position: 199231
> offset: 778 position: 203581
> offset: 794 position: 208024
> offset: 810 position: 212192
> offset: 825 position: 216446
> offset: 841 position: 220564
> offset: 858 position: 224718
> offset: 875 position: 228823
> offset: 890 position: 232983
> offset: 907 position: 237116
> offset: 920 position: 241229
> offset: 936 position: 245504
> offset: 951 position: 249601
> offset: 969 position: 253908
> offset: 986 position: 258074
> offset: 1002 position: 262228
> offset: 1018 position: 266385
> offset: 1035 position: 270699
> offset: 1051 position: 274843
> offset: 1067 position: 278954
> offset: 1085 position: 283283
> offset: 1102 position: 287632
> offset: 1118 position: 291971
> offset: 1135 position: 296271
> offset: 1152 position: 300722
> offset: 1168 position: 304924
> offset: 1184 position: 309051
> offset: 1201 position: 313349
> offset: 1217 position: 317543
> offset: 1233 position: 321727
> offset: 1249 position: 325877
> offset: 1266 position: 330122
> offset: 1282 position: 334413
> offset: 1298 position: 338579
> offset: 1313 position: 342717
> Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
>         at java.nio.HeapByteBuffer.<init>(Unknown Source)
>         at java.nio.ByteBuffer.allocate(Unknown Source)
>         at
> kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:188)
>         at
> kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:165)
>         at
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
>         at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
>         at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:38)
>         at scala.collection.IterableLike$class.head(IterableLike.scala:91)
>         at kafka.message.MessageSet.head(MessageSet.scala:67)
>         at
> kafka.tools.DumpLogSegments$$anonfun$kafka$tools$DumpLogSegments$$dumpIndex$1.apply$mcVI$sp(DumpLogSegments.scala:102)
>         at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>         at
> kafka.tools.DumpLogSegments$.kafka$tools$DumpLogSegments$$dumpIndex(DumpLogSegments.scala:99)
>         at
> kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:68)
>         at
> kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:61)
>         at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>         at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105)
>         at kafka.tools.DumpLogSegments$.main(DumpLogSegments.scala:61)
>         at kafka.tools.DumpLogSegments.main(DumpLogSegments.scala)
>
> If I fiddled with KAFKA_HEAP_OPTS, I could change the error to:
>
> Exception in thread "main" java.util.NoSuchElementException
>         at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39)
>         at scala.collection.IterableLike$class.head(IterableLike.scala:91)
>         at kafka.message.MessageSet.head(MessageSet.scala:67)
>         at
> kafka.tools.DumpLogSegments$$anonfun$kafka$tools$DumpLogSegments$$dumpIndex$1.apply$mcVI$sp(DumpLogSegments.scala:102)
>         at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>         at
> kafka.tools.DumpLogSegments$.kafka$tools$DumpLogSegments$$dumpIndex(DumpLogSegments.scala:99)
>         at
> kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:68)
>         at
> kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:61)
>         at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>         at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105)
>         at kafka.tools.DumpLogSegments$.main(DumpLogSegments.scala:61)
>         at kafka.tools.DumpLogSegments.main(DumpLogSegments.scala)
>
> (bumping it to 1024M didn't do the trick, I still got the heap-space
> exception then, but going to 10240M gave it enough heap to fail this way
> (-: ).
>
>    Also BTW: there seems to be a bug in dumpIndex() in
> DumpLogSegments.scala, as it is unhappy if the topic name has a dot in it.
>  If you try to use DumpLogSegments to look at such a file (well, if it's in
> the "real" log directory, of course), if you topic was named X.Y, and
> you're in the directory X.Y-9, and you're looking at
> 00000000000000000000.index, you'll get an error like:
>
> Exception in thread "main" java.io.FileNotFoundException:
> /home/whatever/X.log (No such file or directory)
>         at java.io.FileInputStream.open(Native Method)
>         at java.io.FileInputStream.<init>(Unknown Source)
>         at kafka.utils.Utils$.openChannel(Utils.scala:157)
>         at kafka.log.FileMessageSet.<init>(FileMessageSet.scala:74)
>         at
> kafka.tools.DumpLogSegments$.kafka$tools$DumpLogSegments$$dumpIndex(DumpLogSegments.scala:97)
>         at
> kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:68)
>         at
> kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:61)
>         at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>         at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105)
>         at kafka.tools.DumpLogSegments$.main(DumpLogSegments.scala:61)
>         at kafka.tools.DumpLogSegments.main(DumpLogSegments.scala)
>
>
> because these both assume that there's only one dot on which to split:
>
>     val startOffset = file.getName().split("\\.")(0).toLong
>     val logFileName = file.getAbsolutePath.split("\\.")(0) +
> Log.LogFileSuffix
>
> (it's not bad to work around, copy or move the files to a directory
> without dots in it, and you're OK, but I did want to point it out).
>
>    Overnight, I did a ton of pcap captures, while having something watch
> the logging output looking for the same sort of ProducerRequest error I'd
> described below.  It's a lot of data to sort through and I'm still poking
> at it, but at first glance at least I could see the ProduceRequest that
> produced a similar error... and either it wasn't messed up or if it's
> messed up, it is messed up in a way that such that the Kafka protocol
> decoder doesn't see the issue.
>
>    For example, here's an error:
>
> [2014-08-13 00:28:11,232] ERROR [KafkaApi-0] Error processing
> ProducerRequest with correlation id 3484669 from client test-producer on
> partition [mytopic,4] (kafka.server.KafkaApis)
> java.lang.ArrayIndexOutOfBoundsException
> [2014-08-13 00:28:11,233] INFO [KafkaApi-0] Send the close connection
> response due to error handling produce request [clientId = test-producer,
> correlationId = 3484669, topicAndPartition = [mytopic,4]] with Ack=0
> (kafka.server.KafkaApis)
>
> and below is the ProduceRequest before the one that produced the error,
> then the one that produced the error, then the next one (note that the
> topic name has been changed but it is 55 bytes long).
>
>    One other interesting thing I see, though: for one of the times we saw
> this corruption, it looks like there was also some leader-election flappage
> going on (again, topic name changed to something innocuous).  Here's the
> error:
>
> [2014-08-13 04:45:41,750] ERROR [KafkaApi-0] Error processing
> ProducerRequest with correlation id 5016862 from client test-producer on
> partition [mytopic,8] (kafka.server.KafkaApis)
> java.lang.ArrayIndexOutOfBoundsException
> [2014-08-13 04:45:41,750] INFO [KafkaApi-0] Send the close connection
> response due to error handling produce request [clientId = test-producer,
> correlationId = 5016862, topicAndPartition = [mytopic,8]] with Ack=0
> (kafka.server.KafkaApis)
>
> and here's what I saw in state-change.log that was related to the same
> topic and partition:
>
> [2014-08-13 04:45:28,018] TRACE Controller 0 epoch 3 changed partition
> [mytopic,8] state from OnlinePartition to OfflinePartition
> (state.change.logger)
> [2014-08-13 04:45:28,021] TRACE Controller 0 epoch 3 started leader
> election for partition [mytopic,8] (state.change.logger)
> [2014-08-13 04:45:28,026] ERROR Controller 0 epoch 3 initiated state
> change for partition [mytopic,8] from OfflinePartition to OnlinePartition
> failed (state.change.logger)
> [2014-08-13 04:45:28,933] TRACE Controller 0 epoch 3 changed state of
> replica 1 for partition [mytopic,8] from OnlineReplica to OfflineReplica
> (state.change.logger)
> [2014-08-13 04:45:29,150] TRACE Controller 0 epoch 3 sending
> UpdateMetadata request (Leader:-1,ISR:,LeaderEpoch:89,ControllerEpoch:3)
> with correlationId 276 to broker 0 for partition [mytopic,8]
> (state.change.logger)
> [2014-08-13 04:45:29,155] TRACE Broker 0 cached leader info
> (LeaderAndIsrInfo:(Leader:-1,ISR:,LeaderEpoch:89,ControllerEpoch:3),ReplicationFactor:1),AllReplicas:1)
> for partition [mytopic,8] in response to UpdateMetadata request sent by
> cont roller 0 epoch 3 with correlation id 276 (state.change.logger)
> [2014-08-13 04:45:33,940] TRACE Controller 0 epoch 3 sending
> UpdateMetadata request (Leader:-1,ISR:,LeaderEpoch:89,ControllerEpoch:3)
> with correlationId 277 to broker 1 for partition [mytopic,8]
> (state.change.logger)
> [2014-08-13 04:45:33,940] TRACE Controller 0 epoch 3 sending
> UpdateMetadata request (Leader:0,ISR:0,LeaderEpoch:1,ControllerEpoch:3)
> with correlationId 277 to broker 1 for partition [mytopic,8]
> (state.change.logger)
> [2014-08-13 04:45:33,949] TRACE Controller 0 epoch 3 changed state of
> replica 1 for partition [mytopic,8] from OfflineReplica to OnlineReplica
> (state.change.logger)
> [2014-08-13 04:45:33,954] TRACE Controller 0 epoch 3 sending
> become-follower LeaderAndIsr request
> (Leader:-1,ISR:,LeaderEpoch:89,ControllerEpoch:3) with correlationId 278 to
> broker 1 for partition [mytopic,8] (state.change.logger)
> [2014-08-13 04:45:33,956] TRACE Controller 0 epoch 3 sending
> UpdateMetadata request (Leader:-1,ISR:,LeaderEpoch:89,ControllerEpoch:3)
> with correlationId 278 to broker 1 for partition [mytopic,8]
> (state.change.logger)
> [2014-08-13 04:45:33,959] TRACE Controller 0 epoch 3 sending
> UpdateMetadata request (Leader:-1,ISR:,LeaderEpoch:89,ControllerEpoch:3)
> with correlationId 278 to broker 0 for partition [mytopic,8]
> (state.change.logger)
> [2014-08-13 04:45:33,961] TRACE Controller 0 epoch 3 started leader
> election for partition [mytopic,8] (state.change.logger)
> [2014-08-13 04:45:33,963] TRACE Broker 0 cached leader info
> (LeaderAndIsrInfo:(Leader:-1,ISR:,LeaderEpoch:89,ControllerEpoch:3),ReplicationFactor:1),AllReplicas:1)
> for partition [mytopic,8] in response to UpdateMetadata request sent by
> controller 0 epoch 3 with correlation id 278 (state.change.logger)
> [2014-08-13 04:45:33,968] TRACE Controller 0 epoch 3 elected leader 1 for
> Offline partition [mytopic,8] (state.change.logger)
> [2014-08-13 04:45:33,969] TRACE Controller 0 epoch 3 changed partition
> [mytopic,8] from OfflinePartition to OnlinePartition with leader 1
> (state.change.logger)
> [2014-08-13 04:45:39,245] TRACE Controller 0 epoch 3 sending become-leader
> LeaderAndIsr request (Leader:1,ISR:1,LeaderEpoch:90,ControllerEpoch:3) with
> correlationId 279 to broker 1 for partition [mytopic,8]
> (state.change.logger)
> [2014-08-13 04:45:39,248] TRACE Controller 0 epoch 3 sending
> UpdateMetadata request (Leader:1,ISR:1,LeaderEpoch:90,ControllerEpoch:3)
> with correlationId 279 to broker 1 for partition [mytopic,8]
> (state.change.logger)
> [2014-08-13 04:45:39,251] TRACE Controller 0 epoch 3 sending
> UpdateMetadata request (Leader:1,ISR:1,LeaderEpoch:90,ControllerEpoch:3)
> with correlationId 279 to broker 0 for partition [mytopic,8]
> (state.change.logger)
> [2014-08-13 04:45:39,255] TRACE Broker 0 cached leader info
> (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:90,ControllerEpoch:3),ReplicationFactor:1),AllReplicas:1)
> for partition [mytopic,8] in response to UpdateMetadata request sent by
> controller 0 epoch 3 with correlation id 279 (state.change.logger)
>
>    Strangely, though, I see another ProducerRequest error at around
> 09:08:11, but I don't see any obvious controller-related flappage at that
> time.  So maybe correlation (leader election at nearly the same time we see
> the error) doesn't imply causation.
>
>    I don't know.  This is the only producer that I can think of here that
> produces any significant number of messages (maybe a burst of 600ish, every
> 30sec or so) and that does so one at a time rather than by batching them.
>  Maybe there's an element here that's related to publishing lots of message
> sets with only one (small) message in them?
>
>    If there's anything else you want me to look at, and/or if you want me
> to reformat this somehow so it's more readable (I know it's a lot of stuff
> inline), please let me know.  Thanks!
>
>         -Steve
>
> =========== sample transaction data for one case of this sort of error
> Frame 8451: 516 bytes on wire (4128 bits), 516 bytes captured (4128 bits)
>     Encapsulation type: Ethernet (1)
>     Arrival Time: Aug 13, 2014 00:28:11.232408000 UTC
>     [Time shift for this packet: 0.000000000 seconds]
>     Epoch Time: 1407889691.232408000 seconds
>     [Time delta from previous captured frame: 0.000479000 seconds]
>     [Time delta from previous displayed frame: 0.000479000 seconds]
>     [Time since reference or first frame: 191.232312000 seconds]
>     Frame Number: 8451
>     Frame Length: 516 bytes (4128 bits)
>     Capture Length: 516 bytes (4128 bits)
>     [Frame is marked: False]
>     [Frame is ignored: False]
>     [Protocols in frame: eth:ethertype:ip:tcp:kafka]
>     [Number of per-protocol-data: 1]
>     [Kafka, key 0]
> Ethernet II, Src: 84:2b:2b:4e:47:b0 (84:2b:2b:4e:47:b0), Dst:
> 84:2b:2b:4e:7b:a1 (84:2b:2b:4e:7b:a1)
>     Destination: 84:2b:2b:4e:7b:a1 (84:2b:2b:4e:7b:a1)
>         Address: 84:2b:2b:4e:7b:a1 (84:2b:2b:4e:7b:a1)
>         .... ..0. .... .... .... .... = LG bit: Globally unique address
> (factory default)
>         .... ...0 .... .... .... .... = IG bit: Individual address
> (unicast)
>     Source: 84:2b:2b:4e:47:b0 (84:2b:2b:4e:47:b0)
>         Address: 84:2b:2b:4e:47:b0 (84:2b:2b:4e:47:b0)
>         .... ..0. .... .... .... .... = LG bit: Globally unique address
> (factory default)
>         .... ...0 .... .... .... .... = IG bit: Individual address
> (unicast)
>     Type: IP (0x0800)
> Internet Protocol Version 4, Src: 10.163.193.125 (10.163.193.125), Dst:
> 10.163.193.121 (10.163.193.121)
>     Version: 4
>     Header Length: 20 bytes
>     Differentiated Services Field: 0x00 (DSCP 0x00: Default; ECN: 0x00:
> Not-ECT (Not ECN-Capable Transport))
>         0000 00.. = Differentiated Services Codepoint: Default (0x00)
>         .... ..00 = Explicit Congestion Notification: Not-ECT (Not
> ECN-Capable Transport) (0x00)
>     Total Length: 502
>     Identification: 0x2b7a (11130)
>     Flags: 0x02 (Don't Fragment)
>         0... .... = Reserved bit: Not set
>         .1.. .... = Don't fragment: Set
>         ..0. .... = More fragments: Not set
>     Fragment offset: 0
>     Time to live: 64
>     Protocol: TCP (6)
>     Header checksum: 0x754b [validation disabled]
>         [Good: False]
>         [Bad: False]
>     Source: 10.163.193.125 (10.163.193.125)
>     Destination: 10.163.193.121 (10.163.193.121)
>     [Source GeoIP: Unknown]
>     [Destination GeoIP: Unknown]
> Transmission Control Protocol, Src Port: 33857 (33857), Dst Port: 9092
> (9092), Seq: 3091465, Ack: 1, Len: 450
>     Source Port: 33857 (33857)
>     Destination Port: 9092 (9092)
>     [Stream index: 14]
>     [TCP Segment Len: 450]
>     Sequence number: 3091465    (relative sequence number)
>     [Next sequence number: 3091915    (relative sequence number)]
>     Acknowledgment number: 1    (relative ack number)
>     Header Length: 32 bytes
>     .... 0000 0001 1000 = Flags: 0x018 (PSH, ACK)
>         000. .... .... = Reserved: Not set
>         ...0 .... .... = Nonce: Not set
>         .... 0... .... = Congestion Window Reduced (CWR): Not set
>         .... .0.. .... = ECN-Echo: Not set
>         .... ..0. .... = Urgent: Not set
>         .... ...1 .... = Acknowledgment: Set
>         .... .... 1... = Push: Set
>         .... .... .0.. = Reset: Not set
>         .... .... ..0. = Syn: Not set
>         .... .... ...0 = Fin: Not set
>     Window size value: 107
>     [Calculated window size: 13696]
>     [Window size scaling factor: 128]
>     Checksum: 0x10a1 [validation disabled]
>         [Good Checksum: False]
>         [Bad Checksum: False]
>     Urgent pointer: 0
>     Options: (12 bytes), No-Operation (NOP), No-Operation (NOP), Timestamps
>         No-Operation (NOP)
>             Type: 1
>                 0... .... = Copy on fragmentation: No
>                 .00. .... = Class: Control (0)
>                 ...0 0001 = Number: No-Operation (NOP) (1)
>         No-Operation (NOP)
>             Type: 1
>                 0... .... = Copy on fragmentation: No
>                 .00. .... = Class: Control (0)
>                 ...0 0001 = Number: No-Operation (NOP) (1)
>         Timestamps: TSval 2762073799, TSecr 2668832110
>             Kind: Time Stamp Option (8)
>             Length: 10
>             Timestamp value: 2762073799
>             Timestamp echo reply: 2668832110
>     [SEQ/ACK analysis]
>         [iRTT: 0.000199000 seconds]
>         [Bytes in flight: 450]
> Kafka
>     Length: 446
>     API Key: Produce (0)
>     API Version: 0
>     Correlation ID: 3484667
>     String Length: 13
>     Client ID: test-producer
>     Required Acks: 0
>     Timeout: 10000
>     Array Count: 1
>     Produce Request Topic
>         String Length: 55
>         Topic Name: mytopic
>         Array Count: 1
>         Produce Request Partition
>             Partition ID: 4
>             Message Set Size: 344
>             Message Set
>                 Offset: 0
>                 Message Size: 332
>                 Message
>                     CRC32: 0x81b88c04
>                     Magic Byte: 0
>                     .... ..00 = Compression Codec: None (0)
>                     Bytes Length: -1
>                     Key: <MISSING>
>                     Bytes Length: 318
>                     Value:
> 00b6c4d5be0a000a73646e73320008746b6f320012786e2d...
>
> Frame 8452: 465 bytes on wire (3720 bits), 465 bytes captured (3720 bits)
>     Encapsulation type: Ethernet (1)
>     Arrival Time: Aug 13, 2014 00:28:11.232797000 UTC
>     [Time shift for this packet: 0.000000000 seconds]
>     Epoch Time: 1407889691.232797000 seconds
>     [Time delta from previous captured frame: 0.000389000 seconds]
>     [Time delta from previous displayed frame: 0.000389000 seconds]
>     [Time since reference or first frame: 191.232701000 seconds]
>     Frame Number: 8452
>     Frame Length: 465 bytes (3720 bits)
>     Capture Length: 465 bytes (3720 bits)
>     [Frame is marked: False]
>     [Frame is ignored: False]
>     [Protocols in frame: eth:ethertype:ip:tcp:kafka]
>     [Number of per-protocol-data: 1]
>     [Kafka, key 0]
> Ethernet II, Src: 84:2b:2b:4e:47:b0 (84:2b:2b:4e:47:b0), Dst:
> 84:2b:2b:4e:7b:a1 (84:2b:2b:4e:7b:a1)
>     Destination: 84:2b:2b:4e:7b:a1 (84:2b:2b:4e:7b:a1)
>         Address: 84:2b:2b:4e:7b:a1 (84:2b:2b:4e:7b:a1)
>         .... ..0. .... .... .... .... = LG bit: Globally unique address
> (factory default)
>         .... ...0 .... .... .... .... = IG bit: Individual address
> (unicast)
>     Source: 84:2b:2b:4e:47:b0 (84:2b:2b:4e:47:b0)
>         Address: 84:2b:2b:4e:47:b0 (84:2b:2b:4e:47:b0)
>         .... ..0. .... .... .... .... = LG bit: Globally unique address
> (factory default)
>         .... ...0 .... .... .... .... = IG bit: Individual address
> (unicast)
>     Type: IP (0x0800)
> Internet Protocol Version 4, Src: 10.163.193.125 (10.163.193.125), Dst:
> 10.163.193.121 (10.163.193.121)
>     Version: 4
>     Header Length: 20 bytes
>     Differentiated Services Field: 0x00 (DSCP 0x00: Default; ECN: 0x00:
> Not-ECT (Not ECN-Capable Transport))
>         0000 00.. = Differentiated Services Codepoint: Default (0x00)
>         .... ..00 = Explicit Congestion Notification: Not-ECT (Not
> ECN-Capable Transport) (0x00)
>     Total Length: 451
>     Identification: 0x2b7b (11131)
>     Flags: 0x02 (Don't Fragment)
>         0... .... = Reserved bit: Not set
>         .1.. .... = Don't fragment: Set
>         ..0. .... = More fragments: Not set
>     Fragment offset: 0
>     Time to live: 64
>     Protocol: TCP (6)
>     Header checksum: 0x757d [validation disabled]
>         [Good: False]
>         [Bad: False]
>     Source: 10.163.193.125 (10.163.193.125)
>     Destination: 10.163.193.121 (10.163.193.121)
>     [Source GeoIP: Unknown]
>     [Destination GeoIP: Unknown]
> Transmission Control Protocol, Src Port: 33857 (33857), Dst Port: 9092
> (9092), Seq: 3091915, Ack: 1, Len: 399
>     Source Port: 33857 (33857)
>     Destination Port: 9092 (9092)
>     [Stream index: 14]
>     [TCP Segment Len: 399]
>     Sequence number: 3091915    (relative sequence number)
>     [Next sequence number: 3092314    (relative sequence number)]
>     Acknowledgment number: 1    (relative ack number)
>     Header Length: 32 bytes
>     .... 0000 0001 1000 = Flags: 0x018 (PSH, ACK)
>         000. .... .... = Reserved: Not set
>         ...0 .... .... = Nonce: Not set
>         .... 0... .... = Congestion Window Reduced (CWR): Not set
>         .... .0.. .... = ECN-Echo: Not set
>         .... ..0. .... = Urgent: Not set
>         .... ...1 .... = Acknowledgment: Set
>         .... .... 1... = Push: Set
>         .... .... .0.. = Reset: Not set
>         .... .... ..0. = Syn: Not set
>         .... .... ...0 = Fin: Not set
>     Window size value: 107
>     [Calculated window size: 13696]
>     [Window size scaling factor: 128]
>     Checksum: 0x6f34 [validation disabled]
>         [Good Checksum: False]
>         [Bad Checksum: False]
>     Urgent pointer: 0
>     Options: (12 bytes), No-Operation (NOP), No-Operation (NOP), Timestamps
>         No-Operation (NOP)
>             Type: 1
>                 0... .... = Copy on fragmentation: No
>                 .00. .... = Class: Control (0)
>                 ...0 0001 = Number: No-Operation (NOP) (1)
>         No-Operation (NOP)
>             Type: 1
>                 0... .... = Copy on fragmentation: No
>                 .00. .... = Class: Control (0)
>                 ...0 0001 = Number: No-Operation (NOP) (1)
>         Timestamps: TSval 2762073799, TSecr 2668832111
>             Kind: Time Stamp Option (8)
>             Length: 10
>             Timestamp value: 2762073799
>             Timestamp echo reply: 2668832111
>     [SEQ/ACK analysis]
>         [iRTT: 0.000199000 seconds]
>         [Bytes in flight: 399]
> Kafka
>     Length: 395
>     API Key: Produce (0)
>     API Version: 0
>     Correlation ID: 3484669
>     String Length: 13
>     Client ID: test-producer
>     Required Acks: 0
>     Timeout: 10000
>     Array Count: 1
>     Produce Request Topic
>         String Length: 55
>         Topic Name: mytopic
>         Array Count: 1
>         Produce Request Partition
>             Partition ID: 4
>             Message Set Size: 293
>             Message Set
>                 Offset: 0
>                 Message Size: 281
>                 Message
>                     CRC32: 0x80ce403c
>                     Magic Byte: 0
>                     .... ..00 = Compression Codec: None (0)
>                     Bytes Length: -1
>                     Key: <MISSING>
>                     Bytes Length: 267
>                     Value:
> 00b6c4d5be0a001061646e7363746c640008706172320006...
>
> Frame 8453: 354 bytes on wire (2832 bits), 354 bytes captured (2832 bits)
>     Encapsulation type: Ethernet (1)
>     Arrival Time: Aug 13, 2014 00:28:11.233223000 UTC
>     [Time shift for this packet: 0.000000000 seconds]
>     Epoch Time: 1407889691.233223000 seconds
>     [Time delta from previous captured frame: 0.000426000 seconds]
>     [Time delta from previous displayed frame: 0.000426000 seconds]
>     [Time since reference or first frame: 191.233127000 seconds]
>     Frame Number: 8453
>     Frame Length: 354 bytes (2832 bits)
>     Capture Length: 354 bytes (2832 bits)
>     [Frame is marked: False]
>     [Frame is ignored: False]
>     [Protocols in frame: eth:ethertype:ip:tcp:kafka]
>     [Number of per-protocol-data: 1]
>     [Kafka, key 0]
> Ethernet II, Src: 84:2b:2b:4e:47:b0 (84:2b:2b:4e:47:b0), Dst:
> 84:2b:2b:4e:7b:a1 (84:2b:2b:4e:7b:a1)
>     Destination: 84:2b:2b:4e:7b:a1 (84:2b:2b:4e:7b:a1)
>         Address: 84:2b:2b:4e:7b:a1 (84:2b:2b:4e:7b:a1)
>         .... ..0. .... .... .... .... = LG bit: Globally unique address
> (factory default)
>         .... ...0 .... .... .... .... = IG bit: Individual address
> (unicast)
>     Source: 84:2b:2b:4e:47:b0 (84:2b:2b:4e:47:b0)
>         Address: 84:2b:2b:4e:47:b0 (84:2b:2b:4e:47:b0)
>         .... ..0. .... .... .... .... = LG bit: Globally unique address
> (factory default)
>         .... ...0 .... .... .... .... = IG bit: Individual address
> (unicast)
>     Type: IP (0x0800)
> Internet Protocol Version 4, Src: 10.163.193.125 (10.163.193.125), Dst:
> 10.163.193.121 (10.163.193.121)
>     Version: 4
>     Header Length: 20 bytes
>     Differentiated Services Field: 0x00 (DSCP 0x00: Default; ECN: 0x00:
> Not-ECT (Not ECN-Capable Transport))
>         0000 00.. = Differentiated Services Codepoint: Default (0x00)
>         .... ..00 = Explicit Congestion Notification: Not-ECT (Not
> ECN-Capable Transport) (0x00)
>     Total Length: 340
>     Identification: 0x2b7c (11132)
>     Flags: 0x02 (Don't Fragment)
>         0... .... = Reserved bit: Not set
>         .1.. .... = Don't fragment: Set
>         ..0. .... = More fragments: Not set
>     Fragment offset: 0
>     Time to live: 64
>     Protocol: TCP (6)
>     Header checksum: 0x75eb [validation disabled]
>         [Good: False]
>         [Bad: False]
>     Source: 10.163.193.125 (10.163.193.125)
>     Destination: 10.163.193.121 (10.163.193.121)
>     [Source GeoIP: Unknown]
>     [Destination GeoIP: Unknown]
> Transmission Control Protocol, Src Port: 33857 (33857), Dst Port: 9092
> (9092), Seq: 3092314, Ack: 1, Len: 288
>     Source Port: 33857 (33857)
>     Destination Port: 9092 (9092)
>     [Stream index: 14]
>     [TCP Segment Len: 288]
>     Sequence number: 3092314    (relative sequence number)
>     [Next sequence number: 3092602    (relative sequence number)]
>     Acknowledgment number: 1    (relative ack number)
>     Header Length: 32 bytes
>     .... 0000 0001 1000 = Flags: 0x018 (PSH, ACK)
>         000. .... .... = Reserved: Not set
>         ...0 .... .... = Nonce: Not set
>         .... 0... .... = Congestion Window Reduced (CWR): Not set
>         .... .0.. .... = ECN-Echo: Not set
>         .... ..0. .... = Urgent: Not set
>         .... ...1 .... = Acknowledgment: Set
>         .... .... 1... = Push: Set
>         .... .... .0.. = Reset: Not set
>         .... .... ..0. = Syn: Not set
>         .... .... ...0 = Fin: Not set
>     Window size value: 107
>     [Calculated window size: 13696]
>     [Window size scaling factor: 128]
>     Checksum: 0xea49 [validation disabled]
>         [Good Checksum: False]
>         [Bad Checksum: False]
>     Urgent pointer: 0
>     Options: (12 bytes), No-Operation (NOP), No-Operation (NOP), Timestamps
>         No-Operation (NOP)
>             Type: 1
>                 0... .... = Copy on fragmentation: No
>                 .00. .... = Class: Control (0)
>                 ...0 0001 = Number: No-Operation (NOP) (1)
>         No-Operation (NOP)
>             Type: 1
>                 0... .... = Copy on fragmentation: No
>                 .00. .... = Class: Control (0)
>                 ...0 0001 = Number: No-Operation (NOP) (1)
>         Timestamps: TSval 2762073800, TSecr 2668832112
>             Kind: Time Stamp Option (8)
>             Length: 10
>             Timestamp value: 2762073800
>             Timestamp echo reply: 2668832112
>     [SEQ/ACK analysis]
>         [iRTT: 0.000199000 seconds]
>         [Bytes in flight: 288]
> Kafka
>     Length: 284
>     API Key: Produce (0)
>     API Version: 0
>     Correlation ID: 3484671
>     String Length: 13
>     Client ID: test-producer
>     Required Acks: 0
>     Timeout: 10000
>     Array Count: 1
>     Produce Request Topic
>         String Length: 55
>         Topic Name: mytopic
>         Array Count: 1
>         Produce Request Partition
>             Partition ID: 4
>             Message Set Size: 182
>             Message Set
>                 Offset: 0
>                 Message Size: 170
>                 Message
>                     CRC32: 0x0ece510e
>                     Magic Byte: 0
>                     .... ..00 = Compression Codec: None (0)
>                     Bytes Length: -1
>                     Key: <MISSING>
>                     Bytes Length: 156
>                     Value:
> 00b6c4d5be0a0008726f6f740008686b6735000c246a726f...
>
>
> On Wed, Aug 13, 2014 at 08:15:21AM -0700, Jun Rao wrote:
> > Interesting, could you run DumpLogSegments with and w/o deep-iteration
> and
> > send the output around offset 1327?
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Tue, Aug 12, 2014 at 5:42 PM, Steve Miller <st...@idrathernotsay.com>
> > wrote:
> >
> > > [ "Aha!", you say, "now I know why this guy's been doing so much tshark
> > > stuff!" (-: ]
> > >
> > >    Hi.  I'm running into a strange situation, in which more or less
> all of
> > > the topics on our Kafka server behave exactly as expected... but the
> data
> > > produced by one family of applications is producing fairly frequent
> topic
> > > corruption.
> > >
> > >    When this happens, on the client side, the results are all over the
> > > place: sometimes you get a ConsumerFetchSizeTooSmall exception, or an
> > > exception for an unknown error type, or an invalid-offset error, it's
> all
> > > over the map.
> > >
> > >    On the server side, I think something like this is the first sign of
> > > badness:
> > >
> > > [2014-08-11 21:03:28,121] ERROR [KafkaApi-1] Error processing
> > > ProducerRequest with correlation id 6750 from client test-producer on
> > > partition [mytopic,9] (kafka.server.KafkaApis)
> > > java.lang.ArrayIndexOutOfBoundsException
> > > [2014-08-11 21:03:28,121] INFO [KafkaApi-1] Send the close connection
> > > response due to error handling produce request [clientId =
> test-producer,
> > > correlationId = 6750, topicAndPartition = [mytopic,9]] with Ack=0
> > > (kafka.server.KafkaApis)
> > >
> > > shortly thereafter, you begin to see oddness facing the clients:
> > >
> > > [2014-08-11 21:17:58,132] ERROR [KafkaApi-1] Error when processing
> fetch
> > > request for partition [mytopic,9] offset 1327 from consumer with
> > > correlation id 87204 (kafka.server.KafkaApis)
> > > java.lang.IllegalStateException: Invalid message size: 0
> > >         at kafka.log.FileMessageSet.searchFor(FileMessageSet.scala:127)
> > >         at kafka.log.LogSegment.translateOffset(LogSegment.scala:100)
> > >         at kafka.log.LogSegment.read(LogSegment.scala:137)
> > >         at kafka.log.Log.read(Log.scala:386)
> > >         at
> > >
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530)
> > >         at
> > >
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476)
> > >         at
> > >
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471)
> > >         at
> > >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> > >         at
> > >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> > >         at
> > > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
> > >         at
> > >
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> > >         at
> > > scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> > >         at
> scala.collection.AbstractTraversable.map(Traversable.scala:105)
> > >         at
> > >
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:471)
> > >         at
> kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:437)
> > >         at kafka.server.KafkaApis.handle(KafkaApis.scala:186)
> > >         at
> > > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
> > >         at java.lang.Thread.run(Unknown Source)
> > >
> > > If I go run the DumpLogSegments tool on the particular topic and
> partition
> > > that's generating the errors, I can see there's corruption in the log:
> > >
> > > Non-secutive offsets in
> > > :/data/d3/kafka/log/mytopic-9/00000000000000000000.log
> > >   1327 is followed by 1327
> > >
> > > The only thing producing data to corrupted topics was also the only
> thing
> > > where snappy compression was turned on in the Java API being used by
> the
> > > producer (it's a Storm topology; we've had the same issue with one in
> Scala
> > > and with one that produces very similar data, but that was written in
> > > Java).  We turned that off, published to a different topic name (so it
> was
> > > created fresh), and had a couple of happy days where all was well.
>  Then we
> > > decided that all was well so we tried to go back to the original topic
> --
> > > after we'd verified that all data had aged out of the logs for that
> topic.
> > >  And we started seeing errors again.  So we switched to a different
> topic
> > > again, let it be created, and also started seeing errors on that topic.
> > >
> > > We have other producers, written in C and Java and python, which are
> > > working flawlessly, even though the size of the data they produce and
> the
> > > rate at which they produce it is much larger than what we're seeing
> with
> > > this one problematic producer.  We also have producers written in other
> > > languages that produce at very low rates, so it's (probably) not the
> sort
> > > of thing where the issue is masked by more frequent data production.
> > >
> > > But in any case it looks like there's something the client can send
> that
> > > will corrupt the topic, which seems like something that shouldn't be
> able
> > > to happen.  I know there's at least some error checking for bad
> protocol
> > > requests, as I hacked a python client to produce some corrupt messages
> and
> > > saw an error response from the server.
> > >
> > > I'm happy to supply more data but I'm not sure what would be useful.
>  I'm
> > > also fine with continuing to dig into this on my own but I'd reached a
> > > point where it'd be useful to know if anyone had seen something like
> this
> > > before.  I have a ton o' tcpdumps running and some tail -F greps
> running on
> > > the logs so that if we see that producer error again we can go find the
> > > corresponding tcpdump file and hopefully find the smoking gun.  (It
> turns
> > > out that the real-time tshark processing invocations I sent out
> earlier can
> > > get quite far behind; I had that running when the corruption occurred
> > > today, but the output processing was a full hour behind the current
> time,
> > > the packet-writing part of tshark was far ahead of the packet-analyzing
> > > part!)
> > >
> > > Are there any particular log4j options I should turn on?  Is there a
> way
> > > to just enable trace logging for a specific topic?  Does trace logging
> > > print the contents of the message somewhere, not as something all nice
> and
> > > interpreted but as, say, a bag of hex digits?  I might end up
> rebuilding
> > > kafka and adding some very specialized logging just for this.
> > >
> > > Kafka 0.8.1.1, JRE 1.6.0-71, Storm 0.9.1, RHEL6, in likely order of
> > > importance. (-:  Also, here's the topic description:
> > >
> > > Topic:mytopic   PartitionCount:10       ReplicationFactor:1
> Configs:
> > >         Topic: mytopic  Partition: 0    Leader: 0       Replicas: 0
> > > Isr: 0
> > >         Topic: mytopic  Partition: 1    Leader: 1       Replicas: 1
> > > Isr: 1
> > >         Topic: mytopic  Partition: 2    Leader: 0       Replicas: 0
> > > Isr: 0
> > >         Topic: mytopic  Partition: 3    Leader: 1       Replicas: 1
> > > Isr: 1
> > >         Topic: mytopic  Partition: 4    Leader: 0       Replicas: 0
> > > Isr: 0
> > >         Topic: mytopic  Partition: 5    Leader: 1       Replicas: 1
> > > Isr: 1
> > >         Topic: mytopic  Partition: 6    Leader: 0       Replicas: 0
> > > Isr: 0
> > >         Topic: mytopic  Partition: 7    Leader: 1       Replicas: 1
> > > Isr: 1
> > >         Topic: mytopic  Partition: 8    Leader: 0       Replicas: 0
> > > Isr: 0
> > >         Topic: mytopic  Partition: 9    Leader: 1       Replicas: 1
> > > Isr: 1
> > >
> > > (2 brokers, 1 ZK server, no obvious issues with delays or process
> restarts
> > > or disk errors or any of the other usual suspects.  One partition per
> > > filesystem.  But my gut says none of that's pertinent, it's a matter of
> > > which partition the producer happens to be publishing to when it sends
> > > garbage.)
> > >
> > >
> > >         -Steve
> > >
>
>

Reply via email to