[ https://issues.apache.org/jira/browse/KAFKA-18723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
José Armando García Sancio updated KAFKA-18723: ----------------------------------------------- Fix Version/s: 4.0.1 > KRaft must handle corrupted records in the fetch response > --------------------------------------------------------- > > Key: KAFKA-18723 > URL: https://issues.apache.org/jira/browse/KAFKA-18723 > Project: Kafka > Issue Type: Bug > Components: kraft > Reporter: José Armando García Sancio > Assignee: José Armando García Sancio > Priority: Major > Fix For: 3.9.1, 3.8.2, 3.7.3, 4.0.1 > > > It is possible for a KRaft replica to send corrupted records to the fetching > replicas in the FETCH response. This is because there is a race between when > the FETCH response gets generated by the KRaft IO thread and when the network > thread, or linux kernel, reads the byte position in the log segment. > This race can generated corrupted records if the KRaft replica performed a > truncation after the FETCH response was created but before the network thread > read the bytes from the log segment. > I have seen the following errors: > {code:java} > [ERROR] 2025-01-07 15:04:18,273 [kafka-0-raft-io-thread] > org.apache.kafka.server.fault.ProcessTerminatingFaultHandler handleFault - > Encountered fatal fault: Unexpected error in raft IO thread > org.apache.kafka.common.KafkaException: Append failed unexpectedly > at > kafka.raft.KafkaMetadataLog.handleAndConvertLogAppendInfo(KafkaMetadataLog.scala:117) > at > kafka.raft.KafkaMetadataLog.appendAsFollower(KafkaMetadataLog.scala:110) > at > org.apache.kafka.raft.KafkaRaftClient.appendAsFollower(KafkaRaftClient.java:1227) > at > org.apache.kafka.raft.KafkaRaftClient.handleFetchResponse(KafkaRaftClient.java:1209) > at > org.apache.kafka.raft.KafkaRaftClient.handleResponse(KafkaRaftClient.java:1644) > at > org.apache.kafka.raft.KafkaRaftClient.handleInboundMessage(KafkaRaftClient.java:1770) > at org.apache.kafka.raft.KafkaRaftClient.poll(KafkaRaftClient.java:2355) > at kafka.raft.KafkaRaftManager$RaftIoThread.doWork(RaftManager.scala:71) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:138){code} > and > {code:java} > [ERROR] 2025-01-07 18:06:20,121 [kafka-1-raft-io-thread] > org.apache.kafka.server.fault.ProcessTerminatingFaultHandler handleFault - > Encountered fatal fault: Unexpected error in raft IO thread" > org.apache.kafka.common.errors.CorruptRecordException: Record size 0 is less > than the minimum record overhead (14)"{code} > This race also exists with Kafka's ISR based topic partition. In that case > the replica fetcher catches all CorruptRecordException and > InvalidRecordException. > {code:java} > } catch { > case ime@(_: CorruptRecordException | _: > InvalidRecordException) => > // we log the error and continue. This ensures two > things > // 1. If there is a corrupt message in a topic > partition, it does not bring the fetcher thread > // down and cause other topic partition to also lag > // 2. If the message is corrupt due to a transient > state in the log (truncation, partial writes > // can cause this), we simply continue and should > get fixed in the subsequent fetches > error(s"Found invalid messages during fetch for > partition $topicPartition " + > s"offset ${currentFetchState.fetchOffset}", ime) > partitionsWithError += topicPartition > {code} > The KRaft implementation doesn't handle this case: > {code:java} > } else { > Records records = > FetchResponse.recordsOrFail(partitionResponse); > if (records.sizeInBytes() > 0) { > appendAsFollower(records); > } > OptionalLong highWatermark = > partitionResponse.highWatermark() < 0 ? > OptionalLong.empty() : > OptionalLong.of(partitionResponse.highWatermark()); > updateFollowerHighWatermark(state, highWatermark); > }{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)