[ https://issues.apache.org/jira/browse/KAFKA-3240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15773143#comment-15773143 ]
Joseph Francis edited comment on KAFKA-3240 at 12/23/16 3:50 PM: ----------------------------------------------------------------- We had a similar issue, but looks like it was triggered by kafka-python library double compressing the messages: https://github.com/dpkp/kafka-python/issues/718. The corrupt messages got the replica threads to stop, eventually taking the brokers out of the cluster. was (Author: josephfrancis): We had a similar issue, but looks like it was triggered by kafka-python library double compressing the messages. The corrupt messages got the replica threads to stop, eventually taking the brokers out of the cluster. > Replication issues > ------------------ > > Key: KAFKA-3240 > URL: https://issues.apache.org/jira/browse/KAFKA-3240 > Project: Kafka > Issue Type: Bug > Components: core > Affects Versions: 0.9.0.0, 0.8.2.2, 0.9.0.1 > Environment: FreeBSD 10.2-RELEASE-p9 > Reporter: Jan Omar > Labels: reliability > > Hi, > We are trying to replace our 3-broker cluster running on 0.6 with a new > cluster on 0.9.0.1 (but tried 0.8.2.2 and 0.9.0.0 as well). > - 3 kafka nodes with one zookeeper instance on each machine > - FreeBSD 10.2 p9 > - Nagle off (sysctl net.inet.tcp.delayed_ack=0) > - all kafka machines write a ZFS ZIL to a dedicated SSD > - 3 producers on 3 machines, writing to 1 topics, partitioning 3, replication > factor 3 > - acks all > - 10 Gigabit Ethernet, all machines on one switch, ping 0.05 ms worst case. > While using the ProducerPerformance or rdkafka_performance we are seeing very > strange Replication errors. Any hint on what's going on would be highly > appreciated. Any suggestion on how to debug this properly would help as well. > This is what our broker config looks like: > {code} > broker.id=5 > auto.create.topics.enable=false > delete.topic.enable=true > listeners=PLAINTEXT://:9092 > port=9092 > host.name=kafka-five.acc > advertised.host.name=10.5.3.18 > zookeeper.connect=zookeeper-four.acc:2181,zookeeper-five.acc:2181,zookeeper-six.acc:2181 > zookeeper.connection.timeout.ms=6000 > num.replica.fetchers=1 > replica.fetch.max.bytes=100000000 > replica.fetch.wait.max.ms=500 > replica.high.watermark.checkpoint.interval.ms=5000 > replica.socket.timeout.ms=300000 > replica.socket.receive.buffer.bytes=65536 > replica.lag.time.max.ms=1000 > min.insync.replicas=2 > controller.socket.timeout.ms=30000 > controller.message.queue.size=100 > log.dirs=/var/db/kafka > num.partitions=8 > message.max.bytes=100000000 > auto.create.topics.enable=false > log.index.interval.bytes=4096 > log.index.size.max.bytes=10485760 > log.retention.hours=168 > log.flush.interval.ms=10000 > log.flush.interval.messages=20000 > log.flush.scheduler.interval.ms=2000 > log.roll.hours=168 > log.retention.check.interval.ms=300000 > log.segment.bytes=536870912 > zookeeper.connection.timeout.ms=1000000 > zookeeper.sync.time.ms=5000 > num.io.threads=8 > num.network.threads=4 > socket.request.max.bytes=104857600 > socket.receive.buffer.bytes=1048576 > socket.send.buffer.bytes=1048576 > queued.max.requests=100000 > fetch.purgatory.purge.interval.requests=100 > producer.purgatory.purge.interval.requests=100 > replica.lag.max.messages=10000000 > {code} > These are the errors we're seeing: > {code:borderStyle=solid} > ERROR [Replica Manager on Broker 5]: Error processing fetch operation on > partition [test,0] offset 50727 (kafka.server.ReplicaManager) > java.lang.IllegalStateException: Invalid message size: 0 > at kafka.log.FileMessageSet.searchFor(FileMessageSet.scala:141) > at kafka.log.LogSegment.translateOffset(LogSegment.scala:105) > at kafka.log.LogSegment.read(LogSegment.scala:126) > at kafka.log.Log.read(Log.scala:506) > at > kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:536) > at > kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:507) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) > at > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) > at > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:245) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:507) > at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:462) > at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:431) > at kafka.server.KafkaApis.handle(KafkaApis.scala:69) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) > at java.lang.Thread.run(Thread.java:745)0 > {code} > and > {code} > ERROR Found invalid messages during fetch for partition [test,0] offset 2732 > error Message found with corrupt size (0) in shallow iterator > (kafka.server.ReplicaFetcherThread) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)