Hello Kafka Team, i am using a three node kafka broker cluster but recently compression issues occur on brocker side. Please provide guillines. broker property: process.roles=broker,controller node.id=0 controller.quorum.voters=0...@kafka-0-0.kafka.prod.svc.cluster.local :9093,1...@kafka-1-0.kafka.prod.svc.cluster.local :9093,2...@kafka-2-0.kafka.prod.svc.cluster.local:9093
listeners=PLAINTEXT://kafka-0-0.kafka.prod.svc.cluster.local:9092,CONTROLLER://kafka-0-0.kafka.prod.svc.cluster.local:9093 inter.broker.listener.name=PLAINTEXT advertised.listeners=PLAINTEXT://kafka-0-0.kafka.prod.svc.cluster.local:9092 controller.listener.names=CONTROLLER listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL log.dirs=/mnt/kafka/0 num.partitions=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 group.initial.rebalance.delay.ms=7000 enable.auto.extend=true default.replication.factor=3 compression.type=snappy request.timeout.ms=3000 auto.leader.rebalance.enable=true delete.topic.enable=true log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 log.retention.hours=168 log.cleaner.enable=true log.cleaner.delete.retention.ms=86400 log.cleaner.threads=1 log.cleaner.backoff.ms=30000 log.cleanup.policy=delete num.recovery.threads.per.data.dir=1 num.network.threads=9 message.max.bytes=40000000 num.io.threads=8 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 replica.fetch.max.bytes=104857600 max.in.flight.requests.per.connection=1 controller.quorum.election.timeout.ms=1000 controller.quorum.election.backoff.max.ms=1000 controller.quorum.fetch.timeout.ms=2000 leader.imbalance.check.interval.seconds=300 leader.imbalance.per.broker.percentage=10 queued.max.requests=500 num.replica.fetchers=3 reserved.broker.max.id=999999999 auto.create.topics.enable=true auto.leader.rebalance.enable=true initial.broker.registration.timeout.ms=120000 broker.rack=1 issue : org.apache.kafka.common.KafkaException: Failed to decompress record stream at org.apache.kafka.common.record.DefaultRecordBatch$StreamRecordIterator.readNext(DefaultRecordBatch.java:642) at org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next(DefaultRecordBatch.java:603) at org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next(DefaultRecordBatch.java:572) at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133) at kafka.log.LogValidator$.$anonfun$validateMessagesAndAssignOffsetsCompressed$1(LogValidator.scala:424) at java.base/java.lang.Iterable.forEach(Iterable.java:75) at kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(LogValidator.scala:409) at kafka.log.LogValidator$.validateMessagesAndAssignOffsets(LogValidator.scala:114) at kafka.log.UnifiedLog.append(UnifiedLog.scala:844) at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:760) at kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1167) at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1155) at kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:957) at scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28) at scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27) at scala.collection.mutable.HashMap.map(HashMap.scala:35) at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:945) at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:603) at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:674) at kafka.server.KafkaApis.handle(KafkaApis.scala:183) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5) at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:112) at org.xerial.snappy.SnappyNative.rawUncompress(Native Method) at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:478) at org.xerial.snappy.Snappy.uncompress(Snappy.java:517) at org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:439) at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:466) at java.base/java.io.DataInputStream.readByte(DataInputStream.java:270) at org.apache.kafka.common.utils.ByteUtils.readUnsignedVarint(ByteUtils.java:170) at org.apache.kafka.common.utils.ByteUtils.readVarint(ByteUtils.java:205) at org.apache.kafka.common.record.DefaultRecord.readPartiallyFrom(DefaultRecord.java:371) at org.apache.kafka.common.record.DefaultRecordBatch$1.doReadRecord(DefaultRecordBatch.java:289) at org.apache.kafka.common.record.DefaultRecordBatch$StreamRecordIterator.readNext(DefaultRecordBatch.java:638) ... 21 more