Summary: KRaft replicas can delete segments not included in a snapshot Key: KAFKA-14238 URL: Project: Kafka Issue Type: Bug Components: core, kraft Reporter: Jose Armando Garcia Sancio Fix For: 3.3.0 We see this in the log {code:java} Deleting segment LogSegment(baseOffset=243864, size=9269150, lastModifiedTime=1662486784182, largestRecordTimestamp=Some(1662486784160)) due to retention time 604800000ms breach based on the largest record timestamp in the segment {code} This then cause {{KafkaRaftClient}} to throw an exception when sending batches to the listener: {code:java} java.lang.IllegalStateException: Snapshot expected since next offset of org.apache.kafka.controller.QuorumController$QuorumMetaLogListener@195461949 is 0, log start offset is 369668 and high-watermark is 547379 at org.apache.kafka.raft.KafkaRaftClient.lambda$updateListenersProgress$4( at java.base/java.util.Optional.orElseThrow( at org.apache.kafka.raft.KafkaRaftClient.lambda$updateListenersProgress$5( at java.base/java.util.OptionalLong.ifPresent( at org.apache.kafka.raft.KafkaRaftClient.updateListenersProgress({code} The on disk state for the cluster metadata partition confirms this: {code:java} ls __cluster_metadata-0/ 00000000000000369668.index 00000000000000369668.log 00000000000000369668.timeindex 00000000000000503411.index 00000000000000503411.log 00000000000000503411.snapshot 00000000000000503411.timeindex 00000000000000548746.snapshot leader-epoch-checkpoint partition.metadata quorum-state{code} Noticed that there are no {{checkpoint}} files and the log doesn't have a segment at base offset 0. This is happening because the {{LogConfig}} used for KRaft sets the retention policy to {{delete}} which causes the method {{deleteOldSegments}} to delete old segments even if there are no snaspshot for it. For KRaft, Kafka should only delete segment that breach the log start offset. Log configuration for KRaft: {code:java} val props = new Properties() props.put(LogConfig.MaxMessageBytesProp, config.maxBatchSizeInBytes.toString) props.put(LogConfig.SegmentBytesProp, props.put(LogConfig.SegmentMsProp, props.put(LogConfig.FileDeleteDelayMsProp, LogConfig.validateValues(props) val defaultLogConfig = LogConfig(props){code} Segment deletion code: {code:java} def deleteOldSegments(): Int = { if (config.delete) { deleteLogStartOffsetBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteRetentionMsBreachedSegments() } else { deleteLogStartOffsetBreachedSegments() } }{code}