Mykhailo Baluta created KAFKA-10501: ---------------------------------------
Summary: Log Cleaner never clean up some __consumer_offsets partitions Key: KAFKA-10501 URL: https://issues.apache.org/jira/browse/KAFKA-10501 Project: Kafka Issue Type: Bug Components: log, log cleaner Affects Versions: 2.5.0 Reporter: Mykhailo Baluta Some __comsumer_offsets partitions contain "broken" messages in the second log segment. Example: {code:java} offset: 745253728 position: 49793647 CreateTime: 1594539245536 isvalid: true keysize: 99 valuesize: 28 magic: 2 compresscodec: NONE producerId: 37146 producerEpoch: 0 sequence: 0 isTransactional: true headerKeys: [] offset: 745253729 position: 49793844 CreateTime: 1594539245548 isvalid: true keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 37146 producerEpoch: 0 sequence: -1 isTransactional: true headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 59 offset: 745256523 position: 50070884 CreateTime: 1594540927673 isvalid: true keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 37146 producerEpoch: 1 sequence: -1 isTransactional: true headerKeys: [] endTxnMarker: ABORT coordinatorEpoch: 59 offset: 745256543 position: 50073185 CreateTime: 1594541667798 isvalid: true keysize: 99 valuesize: 28 magic: 2 compresscodec: NONE producerId: 37146 producerEpoch: 0 sequence: 0 isTransactional: true headerKeys: [] {code} Seems like the last 2 records are stored in the wrong order. As a result the last message is transactional and not any ABORT/COMMIT message after. It leads to a producer state with ongoing transactions and firstUncleanableDirtyOffset = 745256543. Thus, compaction always skips for such topic partitions. h3. Possible solution Related logs looks like: {code:java} WARN Producer's epoch at offset 1060744580 in __consumer_offsets-35 is 0, which is smaller than the last seen epoch 1 (kafka.log.ProducerAppendInfo){code} Related code: {code:java} private def checkProducerEpoch(producerEpoch: Short, offset: Long): Unit = { if (producerEpoch < updatedEntry.producerEpoch) { val message = s"Producer's epoch at offset $offset in $topicPartition is $producerEpoch, which is " + s"smaller than the last seen epoch ${updatedEntry.producerEpoch}" if (origin == AppendOrigin.Replication) { warn(message) } else { throw new ProducerFencedException(message) } } } {code} Perhaps exception also should be thrown in case of AppendOrigin.Replication to restrict commit messages into __consumer_offsets topic partitions by old producer epoch -- This message was sent by Atlassian Jira (v8.3.4#803005)