Mykhailo Baluta created KAFKA-10501:
---------------------------------------

             Summary: Log Cleaner never clean up some __consumer_offsets 
partitions
                 Key: KAFKA-10501
                 URL: https://issues.apache.org/jira/browse/KAFKA-10501
             Project: Kafka
          Issue Type: Bug
          Components: log, log cleaner
    Affects Versions: 2.5.0
            Reporter: Mykhailo Baluta


Some __comsumer_offsets partitions contain "broken" messages in the second log 
segment.

Example:

 
{code:java}
offset: 745253728 position: 49793647 CreateTime: 1594539245536 isvalid: true 
keysize: 99 valuesize: 28 magic: 2 compresscodec: NONE producerId: 37146 
producerEpoch: 0 sequence: 0 isTransactional: true headerKeys: []
offset: 745253729 position: 49793844 CreateTime: 1594539245548 isvalid: true 
keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 37146 
producerEpoch: 0 sequence: -1 isTransactional: true headerKeys: [] 
endTxnMarker: COMMIT coordinatorEpoch: 59
offset: 745256523 position: 50070884 CreateTime: 1594540927673 isvalid: true 
keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 37146 
producerEpoch: 1 sequence: -1 isTransactional: true headerKeys: [] 
endTxnMarker: ABORT coordinatorEpoch: 59
offset: 745256543 position: 50073185 CreateTime: 1594541667798 isvalid: true 
keysize: 99 valuesize: 28 magic: 2 compresscodec: NONE producerId: 37146 
producerEpoch: 0 sequence: 0 isTransactional: true headerKeys: []
{code}
 

Seems like the last 2 records are stored in the wrong order. As a result the 
last message is transactional and not any ABORT/COMMIT message after. It leads 
to a producer state with ongoing transactions and firstUncleanableDirtyOffset = 
745256543. Thus, compaction always skips for such topic partitions.
h3. Possible solution

Related logs looks like:

 
{code:java}
 WARN Producer's epoch at offset 1060744580 in __consumer_offsets-35 is 0, 
which is smaller than the last seen epoch 1 (kafka.log.ProducerAppendInfo){code}
 


Related code:

 
{code:java}
  private def checkProducerEpoch(producerEpoch: Short, offset: Long): Unit = {
    if (producerEpoch < updatedEntry.producerEpoch) {
      val message = s"Producer's epoch at offset $offset in $topicPartition is 
$producerEpoch, which is " +
        s"smaller than the last seen epoch ${updatedEntry.producerEpoch}"
      if (origin == AppendOrigin.Replication) {
        warn(message)
      } else {
        throw new ProducerFencedException(message)
      }
    }
  }
{code}
Perhaps exception also should be thrown in case of AppendOrigin.Replication to 
restrict commit messages into __consumer_offsets topic partitions by old 
producer epoch



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to